162 lines
5.4 KiB
Python
162 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Simple file-watcher that restarts a command when Python source files change.
|
|
|
|
Usage:
|
|
python scripts/reload_runner.py --watch voicebot -- python voicebot/main.py
|
|
|
|
This is intentionally dependency-free so it works in minimal dev environments
|
|
and inside containers without installing extra packages.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from typing import Dict, List, Optional
|
|
from types import FrameType
|
|
|
|
|
|
def scan_py_mtimes(paths: List[str]) -> Dict[str, float]:
|
|
mtimes: Dict[str, float] = {}
|
|
for p in paths:
|
|
if os.path.isfile(p) and p.endswith('.py'):
|
|
try:
|
|
mtimes[p] = os.path.getmtime(p)
|
|
except OSError:
|
|
pass
|
|
continue
|
|
|
|
for root, _, files in os.walk(p):
|
|
for f in files:
|
|
if not f.endswith('.py'):
|
|
continue
|
|
fp = os.path.join(root, f)
|
|
try:
|
|
mtimes[fp] = os.path.getmtime(fp)
|
|
except OSError:
|
|
# file might disappear between walk and stat
|
|
pass
|
|
return mtimes
|
|
|
|
|
|
def start_process(cmd: List[str]) -> subprocess.Popen[bytes]:
|
|
print("Starting:", " ".join(cmd))
|
|
return subprocess.Popen(cmd)
|
|
|
|
|
|
def terminate_process(p: subprocess.Popen[bytes], timeout: float = 5.0) -> None:
|
|
if p.poll() is not None:
|
|
return
|
|
try:
|
|
p.terminate()
|
|
waited = 0.0
|
|
while p.poll() is None and waited < timeout:
|
|
time.sleep(0.1)
|
|
waited += 0.1
|
|
if p.poll() is None:
|
|
p.kill()
|
|
except Exception as e:
|
|
print("Error terminating process:", e)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Restart a command when .py files change")
|
|
parser.add_argument("--watch", "-w", nargs="+", default=["."], help="Directories or files to watch")
|
|
parser.add_argument("--interval", "-i", type=float, default=1.0, help="Polling interval in seconds")
|
|
parser.add_argument("--delay-restart", type=float, default=0.1, help="Delay after change before restarting")
|
|
parser.add_argument("--no-restart-on-exit", action="store_true", help="Don't restart if the process exits on its own")
|
|
parser.add_argument("--pass-sigterm", action="store_true", help="Forward SIGTERM to child and exit when received")
|
|
# Accept the command to run as a positional "remainder" so callers can
|
|
# separate options with `--` and have everything after it treated as the
|
|
# command. Defining an option named "--" doesn't work reliably with
|
|
# argparse; use a positional argument instead.
|
|
parser.add_argument("cmd", nargs=argparse.REMAINDER, help="Command to run (required)")
|
|
args = parser.parse_args()
|
|
|
|
# args.cmd is the remainder of the command-line. Users typically call this
|
|
# script like: `reload_runner.py --watch . -- mycmd arg1 arg2`.
|
|
# argparse will include a literal leading '--' in the remainder list, so
|
|
# strip it if present.
|
|
raw_cmd = args.cmd
|
|
if raw_cmd and raw_cmd[0] == "--":
|
|
cmd = raw_cmd[1:]
|
|
else:
|
|
cmd = raw_cmd
|
|
|
|
if not cmd:
|
|
parser.error("Missing command to run. Put `--` before the command. See help.")
|
|
|
|
watch_paths = args.watch
|
|
|
|
last_mtimes = scan_py_mtimes(watch_paths)
|
|
|
|
child = start_process(cmd)
|
|
|
|
def handle_sigterm(signum: int, frame: Optional[FrameType]) -> None:
|
|
if args.pass_sigterm:
|
|
try:
|
|
child.send_signal(signum)
|
|
except Exception:
|
|
pass
|
|
print("Received signal, stopping watcher.")
|
|
try:
|
|
terminate_process(child)
|
|
finally:
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, handle_sigterm)
|
|
signal.signal(signal.SIGTERM, handle_sigterm)
|
|
|
|
try:
|
|
while True:
|
|
# Sleep in small increments so Ctrl-C is responsive
|
|
time.sleep(args.interval)
|
|
|
|
# If the child exited on its own
|
|
if child.poll() is not None:
|
|
rc = child.returncode
|
|
print(f"Process exited with code {rc}.")
|
|
if args.no_restart_on_exit:
|
|
return rc
|
|
# else restart immediately
|
|
child = start_process(cmd)
|
|
last_mtimes = scan_py_mtimes(watch_paths)
|
|
continue
|
|
|
|
# Check for source changes
|
|
current = scan_py_mtimes(watch_paths)
|
|
changed = False
|
|
# Check for new or changed files
|
|
for fp, m in current.items():
|
|
if fp not in last_mtimes or last_mtimes.get(fp) != m:
|
|
print("Detected change in:", fp)
|
|
changed = True
|
|
break
|
|
# Check for deleted files
|
|
if not changed:
|
|
for fp in list(last_mtimes.keys()):
|
|
if fp not in current:
|
|
print("Detected deleted file:", fp)
|
|
changed = True
|
|
break
|
|
|
|
if changed:
|
|
# Small debounce
|
|
time.sleep(args.delay_restart)
|
|
terminate_process(child)
|
|
child = start_process(cmd)
|
|
last_mtimes = scan_py_mtimes(watch_paths)
|
|
|
|
except KeyboardInterrupt:
|
|
print("Interrupted, shutting down.")
|
|
terminate_process(child)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|