From a0cb23df287d319d51f0cafe790b38731aca3006 Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Tue, 8 Jul 2025 13:17:36 -0700 Subject: [PATCH] Fix race condition with the same item entering queue multiple times --- src/backend/rag/rag.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/backend/rag/rag.py b/src/backend/rag/rag.py index 59f3319..f48a19e 100644 --- a/src/backend/rag/rag.py +++ b/src/backend/rag/rag.py @@ -70,6 +70,7 @@ class ChromaDBFileWatcher(FileSystemEventHandler): self._umap_model_2d: Optional[umap.UMAP] = None self._umap_model_3d: Optional[umap.UMAP] = None self.md = MarkItDown(enable_plugins=False) # Set to True to enable plugins + self.processing_lock = asyncio.Lock() # self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2') @@ -220,29 +221,26 @@ class ChromaDBFileWatcher(FileSystemEventHandler): async def process_file_update(self, file_path): """Process a file update event.""" - # Skip if already being processed - if file_path in self.processing_files: - logging.info(f"{file_path} already in queue. Not adding.") - return - # if file_path == defines.resume_doc: - # logging.info(f"Not adding {file_path} to RAG -- primary resume") - # return + # Use a lock to make the check-and-add atomic + async with self.processing_lock: + if file_path in self.processing_files: + logging.info(f"{file_path} already in queue. Not adding.") + return - try: logging.info(f"{file_path} not in queue. Adding.") self.processing_files.add(file_path) + try: # Wait a moment to ensure the file write is complete await asyncio.sleep(0.5) # Check if content changed via hash current_hash = self._get_file_hash(file_path) - if not current_hash: # File might have been deleted or is inaccessible + if not current_hash: return if file_path in self.file_hashes and self.file_hashes[file_path] == current_hash: - # File hasn't actually changed in content logging.info(f"Hash has not changed for {file_path}") return