#!/usr/bin/env python3 """ Simple file-watcher that restarts a command when Python source files change. Usage: python scripts/reload_runner.py --watch voicebot -- python voicebot/main.py This is intentionally dependency-free so it works in minimal dev environments and inside containers without installing extra packages. """ from __future__ import annotations import argparse import hashlib import os import signal import subprocess import sys import time from typing import Dict, List, Optional from types import FrameType def scan_py_mtimes(paths: List[str]) -> Dict[str, float]: # Directories to skip during scanning SKIP_DIRS = { ".venv", "__pycache__", ".git", "node_modules", ".mypy_cache", ".pytest_cache", "build", "dist", } mtimes: Dict[str, float] = {} for p in paths: if os.path.isfile(p) and p.endswith('.py'): try: # Use both mtime and ctime to catch more changes in Docker environments stat = os.stat(p) mtimes[p] = max(stat.st_mtime, stat.st_ctime) except OSError: pass continue for root, dirs, files in os.walk(p): # Skip common directories that shouldn't trigger reloads dirs[:] = [d for d in dirs if d not in SKIP_DIRS] for f in files: if not f.endswith('.py'): continue fp = os.path.join(root, f) try: # Use both mtime and ctime to catch more changes in Docker environments stat = os.stat(fp) mtimes[fp] = max(stat.st_mtime, stat.st_ctime) except OSError: # file might disappear between walk and stat pass return mtimes def scan_py_hashes(paths: List[str]) -> Dict[str, str]: """Fallback method: scan file content hashes for change detection.""" # Directories to skip during scanning SKIP_DIRS = { ".venv", "__pycache__", ".git", "node_modules", ".mypy_cache", ".pytest_cache", "build", "dist", } hashes: Dict[str, str] = {} for p in paths: if os.path.isfile(p) and p.endswith(".py"): try: with open(p, "rb") as f: content = f.read() hashes[p] = hashlib.md5(content).hexdigest() except OSError: pass continue for root, dirs, files in os.walk(p): # Skip common directories that shouldn't trigger reloads dirs[:] = [d for d in dirs if d not in SKIP_DIRS] for f in files: if not f.endswith(".py"): continue fp = os.path.join(root, f) try: with open(fp, "rb") as file: content = file.read() hashes[fp] = hashlib.md5(content).hexdigest() except OSError: # file might disappear between walk and read pass return hashes def start_process(cmd: List[str]) -> subprocess.Popen[bytes]: print("Starting:", " ".join(cmd)) return subprocess.Popen(cmd) def terminate_process(p: subprocess.Popen[bytes], timeout: float = 5.0) -> None: if p.poll() is not None: return try: p.terminate() waited = 0.0 while p.poll() is None and waited < timeout: time.sleep(0.1) waited += 0.1 if p.poll() is None: p.kill() except Exception as e: print("Error terminating process:", e) def main() -> int: parser = argparse.ArgumentParser(description="Restart a command when .py files change") parser.add_argument("--watch", "-w", nargs="+", default=["."], help="Directories or files to watch") parser.add_argument( "--interval", "-i", type=float, default=0.5, help="Polling interval in seconds" ) parser.add_argument("--delay-restart", type=float, default=0.1, help="Delay after change before restarting") parser.add_argument("--no-restart-on-exit", action="store_true", help="Don't restart if the process exits on its own") parser.add_argument("--pass-sigterm", action="store_true", help="Forward SIGTERM to child and exit when received") parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose logging" ) parser.add_argument( "--use-hash-fallback", action="store_true", help="Use content hashing as fallback for Docker environments", ) # Accept the command to run as a positional "remainder" so callers can # separate options with `--` and have everything after it treated as the # command. Defining an option named "--" doesn't work reliably with # argparse; use a positional argument instead. parser.add_argument("cmd", nargs=argparse.REMAINDER, help="Command to run (required)") args = parser.parse_args() # args.cmd is the remainder of the command-line. Users typically call this # script like: `reload_runner.py --watch . -- mycmd arg1 arg2`. # argparse will include a literal leading '--' in the remainder list, so # strip it if present. raw_cmd = args.cmd if raw_cmd and raw_cmd[0] == "--": cmd = raw_cmd[1:] else: cmd = raw_cmd if not cmd: parser.error("Missing command to run. Put `--` before the command. See help.") watch_paths = args.watch last_mtimes = scan_py_mtimes(watch_paths) last_hashes = scan_py_hashes(watch_paths) if args.use_hash_fallback else {} if args.verbose: print(f"Watching {len(last_mtimes)} Python files in paths: {watch_paths}") print(f"Working directory: {os.getcwd()}") print(f"Resolved watch paths: {[os.path.abspath(p) for p in watch_paths]}") print(f"Polling interval: {args.interval}s") if args.use_hash_fallback: print("Using content hash fallback for change detection") print("Sample files being watched:") for fp in sorted(last_mtimes.keys())[:5]: print(f" {fp}") if len(last_mtimes) > 5: print(f" ... and {len(last_mtimes) - 5} more") child = start_process(cmd) def handle_sigterm(signum: int, frame: Optional[FrameType]) -> None: if args.pass_sigterm: try: child.send_signal(signum) except Exception: pass print("Received signal, stopping watcher.") try: terminate_process(child) finally: sys.exit(0) signal.signal(signal.SIGINT, handle_sigterm) signal.signal(signal.SIGTERM, handle_sigterm) try: while True: # Sleep in small increments so Ctrl-C is responsive time.sleep(args.interval) # If the child exited on its own if child.poll() is not None: rc = child.returncode print(f"Process exited with code {rc}.") if args.no_restart_on_exit: return rc # else restart immediately child = start_process(cmd) last_mtimes = scan_py_mtimes(watch_paths) last_hashes = ( scan_py_hashes(watch_paths) if args.use_hash_fallback else {} ) continue # Check for source changes current = scan_py_mtimes(watch_paths) changed = False change_reason = "" # Check for new or changed files for fp, m in current.items(): if fp not in last_mtimes or last_mtimes.get(fp) != m: print("Detected change in:", fp) if args.verbose: old_mtime = last_mtimes.get(fp, 0) print(f" Old mtime: {old_mtime}, New mtime: {m}") changed = True change_reason = f"mtime change in {fp}" break # Hash-based fallback check if mtime didn't detect changes if not changed and args.use_hash_fallback: current_hashes = scan_py_hashes(watch_paths) for fp, h in current_hashes.items(): if fp not in last_hashes or last_hashes.get(fp) != h: print("Detected content change in:", fp) if args.verbose: print( f" Hash changed: {last_hashes.get(fp, 'None')} -> {h}" ) changed = True change_reason = f"content change in {fp}" break # Update hash cache last_hashes = current_hashes # Check for deleted files if not changed: for fp in list(last_mtimes.keys()): if fp not in current: print("Detected deleted file:", fp) changed = True change_reason = f"deleted file {fp}" break # Additional debug output if args.verbose and not changed: num_files = len(current) if num_files != len(last_mtimes): print(f"File count changed: {len(last_mtimes)} -> {num_files}") changed = True change_reason = "file count change" if changed: if args.verbose: print(f"Restarting due to: {change_reason}") # Small debounce time.sleep(args.delay_restart) terminate_process(child) child = start_process(cmd) last_mtimes = scan_py_mtimes(watch_paths) if args.use_hash_fallback: last_hashes = scan_py_hashes(watch_paths) except KeyboardInterrupt: print("Interrupted, shutting down.") terminate_process(child) return 0 if __name__ == "__main__": raise SystemExit(main())