Fix race condition with the same item entering queue multiple times
This commit is contained in:
parent
d66e1ee1e4
commit
a0cb23df28
@ -70,6 +70,7 @@ class ChromaDBFileWatcher(FileSystemEventHandler):
|
||||
self._umap_model_2d: Optional[umap.UMAP] = None
|
||||
self._umap_model_3d: Optional[umap.UMAP] = None
|
||||
self.md = MarkItDown(enable_plugins=False) # Set to True to enable plugins
|
||||
self.processing_lock = asyncio.Lock()
|
||||
|
||||
# self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
|
||||
|
||||
@ -220,29 +221,26 @@ class ChromaDBFileWatcher(FileSystemEventHandler):
|
||||
|
||||
async def process_file_update(self, file_path):
|
||||
"""Process a file update event."""
|
||||
# Skip if already being processed
|
||||
if file_path in self.processing_files:
|
||||
logging.info(f"{file_path} already in queue. Not adding.")
|
||||
return
|
||||
|
||||
# if file_path == defines.resume_doc:
|
||||
# logging.info(f"Not adding {file_path} to RAG -- primary resume")
|
||||
# return
|
||||
# Use a lock to make the check-and-add atomic
|
||||
async with self.processing_lock:
|
||||
if file_path in self.processing_files:
|
||||
logging.info(f"{file_path} already in queue. Not adding.")
|
||||
return
|
||||
|
||||
try:
|
||||
logging.info(f"{file_path} not in queue. Adding.")
|
||||
self.processing_files.add(file_path)
|
||||
|
||||
try:
|
||||
# Wait a moment to ensure the file write is complete
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# Check if content changed via hash
|
||||
current_hash = self._get_file_hash(file_path)
|
||||
if not current_hash: # File might have been deleted or is inaccessible
|
||||
if not current_hash:
|
||||
return
|
||||
|
||||
if file_path in self.file_hashes and self.file_hashes[file_path] == current_hash:
|
||||
# File hasn't actually changed in content
|
||||
logging.info(f"Hash has not changed for {file_path}")
|
||||
return
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user