288 lines
10 KiB
Python
288 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Simple file-watcher that restarts a command when Python source files change.
|
|
|
|
Usage:
|
|
python scripts/reload_runner.py --watch voicebot -- python voicebot/main.py
|
|
|
|
This is intentionally dependency-free so it works in minimal dev environments
|
|
and inside containers without installing extra packages.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from typing import Dict, List, Optional
|
|
from types import FrameType
|
|
|
|
|
|
def scan_py_mtimes(paths: List[str]) -> Dict[str, float]:
|
|
# Directories to skip during scanning
|
|
SKIP_DIRS = {
|
|
".venv",
|
|
"__pycache__",
|
|
".git",
|
|
"node_modules",
|
|
".mypy_cache",
|
|
".pytest_cache",
|
|
"build",
|
|
"dist",
|
|
}
|
|
|
|
mtimes: Dict[str, float] = {}
|
|
for p in paths:
|
|
if os.path.isfile(p) and p.endswith('.py'):
|
|
try:
|
|
# Use both mtime and ctime to catch more changes in Docker environments
|
|
stat = os.stat(p)
|
|
mtimes[p] = max(stat.st_mtime, stat.st_ctime)
|
|
except OSError:
|
|
pass
|
|
continue
|
|
|
|
for root, dirs, files in os.walk(p):
|
|
# Skip common directories that shouldn't trigger reloads
|
|
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
|
|
|
|
for f in files:
|
|
if not f.endswith('.py'):
|
|
continue
|
|
fp = os.path.join(root, f)
|
|
try:
|
|
# Use both mtime and ctime to catch more changes in Docker environments
|
|
stat = os.stat(fp)
|
|
mtimes[fp] = max(stat.st_mtime, stat.st_ctime)
|
|
except OSError:
|
|
# file might disappear between walk and stat
|
|
pass
|
|
return mtimes
|
|
|
|
|
|
def scan_py_hashes(paths: List[str]) -> Dict[str, str]:
|
|
"""Fallback method: scan file content hashes for change detection."""
|
|
# Directories to skip during scanning
|
|
SKIP_DIRS = {
|
|
".venv",
|
|
"__pycache__",
|
|
".git",
|
|
"node_modules",
|
|
".mypy_cache",
|
|
".pytest_cache",
|
|
"build",
|
|
"dist",
|
|
}
|
|
|
|
hashes: Dict[str, str] = {}
|
|
for p in paths:
|
|
if os.path.isfile(p) and p.endswith(".py"):
|
|
try:
|
|
with open(p, "rb") as f:
|
|
content = f.read()
|
|
hashes[p] = hashlib.md5(content).hexdigest()
|
|
except OSError:
|
|
pass
|
|
continue
|
|
|
|
for root, dirs, files in os.walk(p):
|
|
# Skip common directories that shouldn't trigger reloads
|
|
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
|
|
|
|
for f in files:
|
|
if not f.endswith(".py"):
|
|
continue
|
|
fp = os.path.join(root, f)
|
|
try:
|
|
with open(fp, "rb") as file:
|
|
content = file.read()
|
|
hashes[fp] = hashlib.md5(content).hexdigest()
|
|
except OSError:
|
|
# file might disappear between walk and read
|
|
pass
|
|
return hashes
|
|
|
|
|
|
def start_process(cmd: List[str]) -> subprocess.Popen[bytes]:
|
|
print("Starting:", " ".join(cmd))
|
|
return subprocess.Popen(cmd)
|
|
|
|
|
|
def terminate_process(p: subprocess.Popen[bytes], timeout: float = 5.0) -> None:
|
|
if p.poll() is not None:
|
|
return
|
|
try:
|
|
p.terminate()
|
|
waited = 0.0
|
|
while p.poll() is None and waited < timeout:
|
|
time.sleep(0.1)
|
|
waited += 0.1
|
|
if p.poll() is None:
|
|
p.kill()
|
|
except Exception as e:
|
|
print("Error terminating process:", e)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Restart a command when .py files change")
|
|
parser.add_argument("--watch", "-w", nargs="+", default=["."], help="Directories or files to watch")
|
|
parser.add_argument(
|
|
"--interval", "-i", type=float, default=0.5, help="Polling interval in seconds"
|
|
)
|
|
parser.add_argument("--delay-restart", type=float, default=0.1, help="Delay after change before restarting")
|
|
parser.add_argument("--no-restart-on-exit", action="store_true", help="Don't restart if the process exits on its own")
|
|
parser.add_argument("--pass-sigterm", action="store_true", help="Forward SIGTERM to child and exit when received")
|
|
parser.add_argument(
|
|
"--verbose", "-v", action="store_true", help="Enable verbose logging"
|
|
)
|
|
parser.add_argument(
|
|
"--use-hash-fallback",
|
|
action="store_true",
|
|
help="Use content hashing as fallback for Docker environments",
|
|
)
|
|
# Accept the command to run as a positional "remainder" so callers can
|
|
# separate options with `--` and have everything after it treated as the
|
|
# command. Defining an option named "--" doesn't work reliably with
|
|
# argparse; use a positional argument instead.
|
|
parser.add_argument("cmd", nargs=argparse.REMAINDER, help="Command to run (required)")
|
|
args = parser.parse_args()
|
|
|
|
# args.cmd is the remainder of the command-line. Users typically call this
|
|
# script like: `reload_runner.py --watch . -- mycmd arg1 arg2`.
|
|
# argparse will include a literal leading '--' in the remainder list, so
|
|
# strip it if present.
|
|
raw_cmd = args.cmd
|
|
if raw_cmd and raw_cmd[0] == "--":
|
|
cmd = raw_cmd[1:]
|
|
else:
|
|
cmd = raw_cmd
|
|
|
|
if not cmd:
|
|
parser.error("Missing command to run. Put `--` before the command. See help.")
|
|
|
|
watch_paths = args.watch
|
|
|
|
last_mtimes = scan_py_mtimes(watch_paths)
|
|
last_hashes = scan_py_hashes(watch_paths) if args.use_hash_fallback else {}
|
|
|
|
if args.verbose:
|
|
print(f"Watching {len(last_mtimes)} Python files in paths: {watch_paths}")
|
|
print(f"Working directory: {os.getcwd()}")
|
|
print(f"Resolved watch paths: {[os.path.abspath(p) for p in watch_paths]}")
|
|
print(f"Polling interval: {args.interval}s")
|
|
if args.use_hash_fallback:
|
|
print("Using content hash fallback for change detection")
|
|
print("Sample files being watched:")
|
|
for fp in sorted(last_mtimes.keys())[:5]:
|
|
print(f" {fp}")
|
|
if len(last_mtimes) > 5:
|
|
print(f" ... and {len(last_mtimes) - 5} more")
|
|
|
|
child = start_process(cmd)
|
|
|
|
def handle_sigterm(signum: int, frame: Optional[FrameType]) -> None:
|
|
if args.pass_sigterm:
|
|
try:
|
|
child.send_signal(signum)
|
|
except Exception:
|
|
pass
|
|
print("Received signal, stopping watcher.")
|
|
try:
|
|
terminate_process(child)
|
|
finally:
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, handle_sigterm)
|
|
signal.signal(signal.SIGTERM, handle_sigterm)
|
|
|
|
try:
|
|
while True:
|
|
# Sleep in small increments so Ctrl-C is responsive
|
|
time.sleep(args.interval)
|
|
|
|
# If the child exited on its own
|
|
if child.poll() is not None:
|
|
rc = child.returncode
|
|
print(f"Process exited with code {rc}.")
|
|
if args.no_restart_on_exit:
|
|
return rc
|
|
# else restart immediately
|
|
child = start_process(cmd)
|
|
last_mtimes = scan_py_mtimes(watch_paths)
|
|
last_hashes = (
|
|
scan_py_hashes(watch_paths) if args.use_hash_fallback else {}
|
|
)
|
|
continue
|
|
|
|
# Check for source changes
|
|
current = scan_py_mtimes(watch_paths)
|
|
changed = False
|
|
change_reason = ""
|
|
|
|
# Check for new or changed files
|
|
for fp, m in current.items():
|
|
if fp not in last_mtimes or last_mtimes.get(fp) != m:
|
|
print("Detected change in:", fp)
|
|
if args.verbose:
|
|
old_mtime = last_mtimes.get(fp, 0)
|
|
print(f" Old mtime: {old_mtime}, New mtime: {m}")
|
|
changed = True
|
|
change_reason = f"mtime change in {fp}"
|
|
break
|
|
|
|
# Hash-based fallback check if mtime didn't detect changes
|
|
if not changed and args.use_hash_fallback:
|
|
current_hashes = scan_py_hashes(watch_paths)
|
|
for fp, h in current_hashes.items():
|
|
if fp not in last_hashes or last_hashes.get(fp) != h:
|
|
print("Detected content change in:", fp)
|
|
if args.verbose:
|
|
print(
|
|
f" Hash changed: {last_hashes.get(fp, 'None')} -> {h}"
|
|
)
|
|
changed = True
|
|
change_reason = f"content change in {fp}"
|
|
break
|
|
# Update hash cache
|
|
last_hashes = current_hashes
|
|
|
|
# Check for deleted files
|
|
if not changed:
|
|
for fp in list(last_mtimes.keys()):
|
|
if fp not in current:
|
|
print("Detected deleted file:", fp)
|
|
changed = True
|
|
change_reason = f"deleted file {fp}"
|
|
break
|
|
|
|
# Additional debug output
|
|
if args.verbose and not changed:
|
|
num_files = len(current)
|
|
if num_files != len(last_mtimes):
|
|
print(f"File count changed: {len(last_mtimes)} -> {num_files}")
|
|
changed = True
|
|
change_reason = "file count change"
|
|
|
|
if changed:
|
|
if args.verbose:
|
|
print(f"Restarting due to: {change_reason}")
|
|
# Small debounce
|
|
time.sleep(args.delay_restart)
|
|
terminate_process(child)
|
|
child = start_process(cmd)
|
|
last_mtimes = scan_py_mtimes(watch_paths)
|
|
if args.use_hash_fallback:
|
|
last_hashes = scan_py_hashes(watch_paths)
|
|
|
|
except KeyboardInterrupt:
|
|
print("Interrupted, shutting down.")
|
|
terminate_process(child)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|