ai-voicebot/voicebot/scripts/reload_runner.py
2025-09-01 15:59:16 -07:00

288 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Simple file-watcher that restarts a command when Python source files change.
Usage:
python scripts/reload_runner.py --watch voicebot -- python voicebot/main.py
This is intentionally dependency-free so it works in minimal dev environments
and inside containers without installing extra packages.
"""
from __future__ import annotations
import argparse
import hashlib
import os
import signal
import subprocess
import sys
import time
from typing import Dict, List, Optional
from types import FrameType
def scan_py_mtimes(paths: List[str]) -> Dict[str, float]:
# Directories to skip during scanning
SKIP_DIRS = {
".venv",
"__pycache__",
".git",
"node_modules",
".mypy_cache",
".pytest_cache",
"build",
"dist",
}
mtimes: Dict[str, float] = {}
for p in paths:
if os.path.isfile(p) and p.endswith('.py'):
try:
# Use both mtime and ctime to catch more changes in Docker environments
stat = os.stat(p)
mtimes[p] = max(stat.st_mtime, stat.st_ctime)
except OSError:
pass
continue
for root, dirs, files in os.walk(p):
# Skip common directories that shouldn't trigger reloads
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for f in files:
if not f.endswith('.py'):
continue
fp = os.path.join(root, f)
try:
# Use both mtime and ctime to catch more changes in Docker environments
stat = os.stat(fp)
mtimes[fp] = max(stat.st_mtime, stat.st_ctime)
except OSError:
# file might disappear between walk and stat
pass
return mtimes
def scan_py_hashes(paths: List[str]) -> Dict[str, str]:
"""Fallback method: scan file content hashes for change detection."""
# Directories to skip during scanning
SKIP_DIRS = {
".venv",
"__pycache__",
".git",
"node_modules",
".mypy_cache",
".pytest_cache",
"build",
"dist",
}
hashes: Dict[str, str] = {}
for p in paths:
if os.path.isfile(p) and p.endswith(".py"):
try:
with open(p, "rb") as f:
content = f.read()
hashes[p] = hashlib.md5(content).hexdigest()
except OSError:
pass
continue
for root, dirs, files in os.walk(p):
# Skip common directories that shouldn't trigger reloads
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for f in files:
if not f.endswith(".py"):
continue
fp = os.path.join(root, f)
try:
with open(fp, "rb") as file:
content = file.read()
hashes[fp] = hashlib.md5(content).hexdigest()
except OSError:
# file might disappear between walk and read
pass
return hashes
def start_process(cmd: List[str]) -> subprocess.Popen[bytes]:
print("Starting:", " ".join(cmd))
return subprocess.Popen(cmd)
def terminate_process(p: subprocess.Popen[bytes], timeout: float = 5.0) -> None:
if p.poll() is not None:
return
try:
p.terminate()
waited = 0.0
while p.poll() is None and waited < timeout:
time.sleep(0.1)
waited += 0.1
if p.poll() is None:
p.kill()
except Exception as e:
print("Error terminating process:", e)
def main() -> int:
parser = argparse.ArgumentParser(description="Restart a command when .py files change")
parser.add_argument("--watch", "-w", nargs="+", default=["."], help="Directories or files to watch")
parser.add_argument(
"--interval", "-i", type=float, default=0.5, help="Polling interval in seconds"
)
parser.add_argument("--delay-restart", type=float, default=0.1, help="Delay after change before restarting")
parser.add_argument("--no-restart-on-exit", action="store_true", help="Don't restart if the process exits on its own")
parser.add_argument("--pass-sigterm", action="store_true", help="Forward SIGTERM to child and exit when received")
parser.add_argument(
"--verbose", "-v", action="store_true", help="Enable verbose logging"
)
parser.add_argument(
"--use-hash-fallback",
action="store_true",
help="Use content hashing as fallback for Docker environments",
)
# Accept the command to run as a positional "remainder" so callers can
# separate options with `--` and have everything after it treated as the
# command. Defining an option named "--" doesn't work reliably with
# argparse; use a positional argument instead.
parser.add_argument("cmd", nargs=argparse.REMAINDER, help="Command to run (required)")
args = parser.parse_args()
# args.cmd is the remainder of the command-line. Users typically call this
# script like: `reload_runner.py --watch . -- mycmd arg1 arg2`.
# argparse will include a literal leading '--' in the remainder list, so
# strip it if present.
raw_cmd = args.cmd
if raw_cmd and raw_cmd[0] == "--":
cmd = raw_cmd[1:]
else:
cmd = raw_cmd
if not cmd:
parser.error("Missing command to run. Put `--` before the command. See help.")
watch_paths = args.watch
last_mtimes = scan_py_mtimes(watch_paths)
last_hashes = scan_py_hashes(watch_paths) if args.use_hash_fallback else {}
if args.verbose:
print(f"Watching {len(last_mtimes)} Python files in paths: {watch_paths}")
print(f"Working directory: {os.getcwd()}")
print(f"Resolved watch paths: {[os.path.abspath(p) for p in watch_paths]}")
print(f"Polling interval: {args.interval}s")
if args.use_hash_fallback:
print("Using content hash fallback for change detection")
print("Sample files being watched:")
for fp in sorted(last_mtimes.keys())[:5]:
print(f" {fp}")
if len(last_mtimes) > 5:
print(f" ... and {len(last_mtimes) - 5} more")
child = start_process(cmd)
def handle_sigterm(signum: int, frame: Optional[FrameType]) -> None:
if args.pass_sigterm:
try:
child.send_signal(signum)
except Exception:
pass
print("Received signal, stopping watcher.")
try:
terminate_process(child)
finally:
sys.exit(0)
signal.signal(signal.SIGINT, handle_sigterm)
signal.signal(signal.SIGTERM, handle_sigterm)
try:
while True:
# Sleep in small increments so Ctrl-C is responsive
time.sleep(args.interval)
# If the child exited on its own
if child.poll() is not None:
rc = child.returncode
print(f"Process exited with code {rc}.")
if args.no_restart_on_exit:
return rc
# else restart immediately
child = start_process(cmd)
last_mtimes = scan_py_mtimes(watch_paths)
last_hashes = (
scan_py_hashes(watch_paths) if args.use_hash_fallback else {}
)
continue
# Check for source changes
current = scan_py_mtimes(watch_paths)
changed = False
change_reason = ""
# Check for new or changed files
for fp, m in current.items():
if fp not in last_mtimes or last_mtimes.get(fp) != m:
print("Detected change in:", fp)
if args.verbose:
old_mtime = last_mtimes.get(fp, 0)
print(f" Old mtime: {old_mtime}, New mtime: {m}")
changed = True
change_reason = f"mtime change in {fp}"
break
# Hash-based fallback check if mtime didn't detect changes
if not changed and args.use_hash_fallback:
current_hashes = scan_py_hashes(watch_paths)
for fp, h in current_hashes.items():
if fp not in last_hashes or last_hashes.get(fp) != h:
print("Detected content change in:", fp)
if args.verbose:
print(
f" Hash changed: {last_hashes.get(fp, 'None')} -> {h}"
)
changed = True
change_reason = f"content change in {fp}"
break
# Update hash cache
last_hashes = current_hashes
# Check for deleted files
if not changed:
for fp in list(last_mtimes.keys()):
if fp not in current:
print("Detected deleted file:", fp)
changed = True
change_reason = f"deleted file {fp}"
break
# Additional debug output
if args.verbose and not changed:
num_files = len(current)
if num_files != len(last_mtimes):
print(f"File count changed: {len(last_mtimes)} -> {num_files}")
changed = True
change_reason = "file count change"
if changed:
if args.verbose:
print(f"Restarting due to: {change_reason}")
# Small debounce
time.sleep(args.delay_restart)
terminate_process(child)
child = start_process(cmd)
last_mtimes = scan_py_mtimes(watch_paths)
if args.use_hash_fallback:
last_hashes = scan_py_hashes(watch_paths)
except KeyboardInterrupt:
print("Interrupted, shutting down.")
terminate_process(child)
return 0
if __name__ == "__main__":
raise SystemExit(main())