PYTHON
watcher.py🐍python
"""
File system watcher using watchdog.
"""
import asyncio
from pathlib import Path
from typing import Callable, List, Set, Optional
import fnmatch
import logging
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import (
FileSystemEventHandler,
FileCreatedEvent,
FileModifiedEvent,
FileDeletedEvent,
FileMovedEvent,
)
logger = logging.getLogger(__name__)
class EventType(Enum):
"""File system event types."""
CREATED = "created"
MODIFIED = "modified"
DELETED = "deleted"
MOVED = "moved"
@dataclass
class FileEvent:
"""Represents a file system event."""
path: Path
event_type: EventType
timestamp: datetime
is_directory: bool = False
dest_path: Optional[Path] = None # For move events
def __hash__(self):
return hash((str(self.path), self.event_type))
class EventHandler(FileSystemEventHandler):
"""Handler for file system events."""
def __init__(self, callback: Callable, ignore_patterns: List[str]):
super().__init__()
self.callback = callback
self.ignore_patterns = ignore_patterns
self._loop = None
def set_loop(self, loop: asyncio.AbstractEventLoop):
"""Set the asyncio event loop."""
self._loop = loop
def should_ignore(self, path: str) -> bool:
"""Check if path should be ignored."""
path_obj = Path(path)
for pattern in self.ignore_patterns:
# Check filename
if fnmatch.fnmatch(path_obj.name, pattern):
return True
# Check if any parent matches
for part in path_obj.parts:
if fnmatch.fnmatch(part, pattern):
return True
return False
def _handle_event(self, event, event_type: EventType, dest_path: Optional[Path] = None):
"""Handle a file system event."""
if self.should_ignore(event.src_path):
return
file_event = FileEvent(
path=Path(event.src_path),
event_type=event_type,
timestamp=datetime.now(),
is_directory=event.is_directory,
dest_path=dest_path
)
if self._loop:
self._loop.call_soon_threadsafe(
lambda: asyncio.create_task(self.callback(file_event))
)
else:
# Fallback for synchronous handling
logger.debug(f"Event: {event_type.value} - {event.src_path}")
def on_created(self, event):
"""Handle file creation."""
if not event.is_directory:
self._handle_event(event, EventType.CREATED)
def on_modified(self, event):
"""Handle file modification."""
if not event.is_directory:
self._handle_event(event, EventType.MODIFIED)
def on_deleted(self, event):
"""Handle file deletion."""
self._handle_event(event, EventType.DELETED)
def on_moved(self, event):
"""Handle file move/rename."""
if not event.is_directory:
self._handle_event(event, EventType.MOVED, Path(event.dest_path))
class FileWatcher:
"""Watches a directory for file changes."""
def __init__(
self,
watch_dir: str,
callback: Callable,
ignore_patterns: Optional[List[str]] = None,
debounce_seconds: float = 2.0
):
self.watch_dir = Path(watch_dir)
self.callback = callback
self.ignore_patterns = ignore_patterns or []
self.debounce_seconds = debounce_seconds
self._observer = None
self._handler = None
self._pending_events: dict[str, FileEvent] = {}
self._debounce_task = None
self._running = False
async def start(self):
"""Start watching for file changes."""
if self._running:
return
logger.info(f"Starting file watcher on: {self.watch_dir}")
self._handler = EventHandler(
self._on_event,
self.ignore_patterns
)
self._handler.set_loop(asyncio.get_event_loop())
self._observer = Observer()
self._observer.schedule(
self._handler,
str(self.watch_dir),
recursive=True
)
self._observer.start()
self._running = True
# Start debounce processor
self._debounce_task = asyncio.create_task(self._process_debounced())
async def stop(self):
"""Stop watching for file changes."""
if not self._running:
return
logger.info("Stopping file watcher")
self._running = False
if self._debounce_task:
self._debounce_task.cancel()
try:
await self._debounce_task
except asyncio.CancelledError:
pass
if self._observer:
self._observer.stop()
self._observer.join()
async def _on_event(self, event: FileEvent):
"""Handle incoming file event with debouncing."""
# Use path as key, update event (newer replaces older)
key = str(event.path)
self._pending_events[key] = event
async def _process_debounced(self):
"""Process debounced events periodically."""
while self._running:
await asyncio.sleep(self.debounce_seconds)
if not self._pending_events:
continue
# Get and clear pending events
events = list(self._pending_events.values())
self._pending_events.clear()
# Process each event
for event in events:
try:
await self.callback(event)
except Exception as e:
logger.error(f"Error processing event {event}: {e}")
def get_all_files(self) -> List[Path]:
"""Get all files in watch directory (excluding ignored)."""
files = []
for path in self.watch_dir.rglob("*"):
if path.is_file():
# Check ignore patterns
rel_path = str(path.relative_to(self.watch_dir))
should_ignore = False
for pattern in self.ignore_patterns:
if fnmatch.fnmatch(path.name, pattern):
should_ignore = True
break
for part in Path(rel_path).parts:
if fnmatch.fnmatch(part, pattern):
should_ignore = True
break
if not should_ignore:
files.append(path)
return files