PYTHONPython

watcher

real world projects / file sync / sync

PYTHON
watcher.py🐍
"""
File system watcher using watchdog.
"""

import asyncio
from pathlib import Path
from typing import Callable, List, Set, Optional
import fnmatch
import logging
from dataclasses import dataclass
from enum import Enum
from datetime import datetime

from watchdog.observers import Observer
from watchdog.events import (
    FileSystemEventHandler,
    FileCreatedEvent,
    FileModifiedEvent,
    FileDeletedEvent,
    FileMovedEvent,
)

logger = logging.getLogger(__name__)


class EventType(Enum):
    """File system event types."""
    CREATED = "created"
    MODIFIED = "modified"
    DELETED = "deleted"
    MOVED = "moved"


@dataclass
class FileEvent:
    """Represents a file system event."""
    path: Path
    event_type: EventType
    timestamp: datetime
    is_directory: bool = False
    dest_path: Optional[Path] = None  # For move events
    
    def __hash__(self):
        return hash((str(self.path), self.event_type))


class EventHandler(FileSystemEventHandler):
    """Handler for file system events."""
    
    def __init__(self, callback: Callable, ignore_patterns: List[str]):
        super().__init__()
        self.callback = callback
        self.ignore_patterns = ignore_patterns
        self._loop = None
    
    def set_loop(self, loop: asyncio.AbstractEventLoop):
        """Set the asyncio event loop."""
        self._loop = loop
    
    def should_ignore(self, path: str) -> bool:
        """Check if path should be ignored."""
        path_obj = Path(path)
        
        for pattern in self.ignore_patterns:
            # Check filename
            if fnmatch.fnmatch(path_obj.name, pattern):
                return True
            # Check if any parent matches
            for part in path_obj.parts:
                if fnmatch.fnmatch(part, pattern):
                    return True
        
        return False
    
    def _handle_event(self, event, event_type: EventType, dest_path: Optional[Path] = None):
        """Handle a file system event."""
        if self.should_ignore(event.src_path):
            return
        
        file_event = FileEvent(
            path=Path(event.src_path),
            event_type=event_type,
            timestamp=datetime.now(),
            is_directory=event.is_directory,
            dest_path=dest_path
        )
        
        if self._loop:
            self._loop.call_soon_threadsafe(
                lambda: asyncio.create_task(self.callback(file_event))
            )
        else:
            # Fallback for synchronous handling
            logger.debug(f"Event: {event_type.value} - {event.src_path}")
    
    def on_created(self, event):
        """Handle file creation."""
        if not event.is_directory:
            self._handle_event(event, EventType.CREATED)
    
    def on_modified(self, event):
        """Handle file modification."""
        if not event.is_directory:
            self._handle_event(event, EventType.MODIFIED)
    
    def on_deleted(self, event):
        """Handle file deletion."""
        self._handle_event(event, EventType.DELETED)
    
    def on_moved(self, event):
        """Handle file move/rename."""
        if not event.is_directory:
            self._handle_event(event, EventType.MOVED, Path(event.dest_path))


class FileWatcher:
    """Watches a directory for file changes."""
    
    def __init__(
        self,
        watch_dir: str,
        callback: Callable,
        ignore_patterns: Optional[List[str]] = None,
        debounce_seconds: float = 2.0
    ):
        self.watch_dir = Path(watch_dir)
        self.callback = callback
        self.ignore_patterns = ignore_patterns or []
        self.debounce_seconds = debounce_seconds
        
        self._observer = None
        self._handler = None
        self._pending_events: dict[str, FileEvent] = {}
        self._debounce_task = None
        self._running = False
    
    async def start(self):
        """Start watching for file changes."""
        if self._running:
            return
        
        logger.info(f"Starting file watcher on: {self.watch_dir}")
        
        self._handler = EventHandler(
            self._on_event,
            self.ignore_patterns
        )
        self._handler.set_loop(asyncio.get_event_loop())
        
        self._observer = Observer()
        self._observer.schedule(
            self._handler,
            str(self.watch_dir),
            recursive=True
        )
        
        self._observer.start()
        self._running = True
        
        # Start debounce processor
        self._debounce_task = asyncio.create_task(self._process_debounced())
    
    async def stop(self):
        """Stop watching for file changes."""
        if not self._running:
            return
        
        logger.info("Stopping file watcher")
        
        self._running = False
        
        if self._debounce_task:
            self._debounce_task.cancel()
            try:
                await self._debounce_task
            except asyncio.CancelledError:
                pass
        
        if self._observer:
            self._observer.stop()
            self._observer.join()
    
    async def _on_event(self, event: FileEvent):
        """Handle incoming file event with debouncing."""
        # Use path as key, update event (newer replaces older)
        key = str(event.path)
        self._pending_events[key] = event
    
    async def _process_debounced(self):
        """Process debounced events periodically."""
        while self._running:
            await asyncio.sleep(self.debounce_seconds)
            
            if not self._pending_events:
                continue
            
            # Get and clear pending events
            events = list(self._pending_events.values())
            self._pending_events.clear()
            
            # Process each event
            for event in events:
                try:
                    await self.callback(event)
                except Exception as e:
                    logger.error(f"Error processing event {event}: {e}")
    
    def get_all_files(self) -> List[Path]:
        """Get all files in watch directory (excluding ignored)."""
        files = []
        
        for path in self.watch_dir.rglob("*"):
            if path.is_file():
                # Check ignore patterns
                rel_path = str(path.relative_to(self.watch_dir))
                should_ignore = False
                
                for pattern in self.ignore_patterns:
                    if fnmatch.fnmatch(path.name, pattern):
                        should_ignore = True
                        break
                    for part in Path(rel_path).parts:
                        if fnmatch.fnmatch(part, pattern):
                            should_ignore = True
                            break
                
                if not should_ignore:
                    files.append(path)
        
        return files
PreviousNext