PYTHONPython

sync manager

real world projects / file sync / sync

PYTHON
sync_manager.py🐍
"""
Main sync manager orchestrating all sync operations.
"""

import asyncio
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional, Any
import logging

from .watcher import FileWatcher, FileEvent, EventType
from .state import SyncState, FileState
from .backends.base import BaseBackend
from .utils.compression import compress_file, decompress_file
from .utils.encryption import encrypt_file, decrypt_file
from .config import settings

logger = logging.getLogger(__name__)


class SyncManager:
    """Manages file synchronization between local and remote."""
    
    def __init__(
        self,
        source_dir: str,
        backend: BaseBackend,
        compression: bool = False,
        encryption: bool = False,
        encryption_key: Optional[str] = None,
        conflict_resolution: str = "keep_both"
    ):
        self.source_dir = Path(source_dir)
        self.backend = backend
        self.compression = compression
        self.encryption = encryption
        self.encryption_key = encryption_key
        self.conflict_resolution = conflict_resolution
        
        self.state = SyncState(source_dir)
        self.watcher = FileWatcher(
            watch_dir=source_dir,
            callback=self._on_file_event,
            ignore_patterns=settings.IGNORE_PATTERNS,
            debounce_seconds=settings.DEBOUNCE_SECONDS
        )
        
        self._sync_queue: asyncio.Queue = asyncio.Queue()
        self._sync_task: Optional[asyncio.Task] = None
        self._running = False
    
    async def start_watching(self):
        """Start watching for file changes and processing sync queue."""
        if self._running:
            return
        
        logger.info("Starting sync manager")
        self._running = True
        
        # Initialize backend
        await self.backend.connect()
        
        # Start file watcher
        await self.watcher.start()
        
        # Start sync processor
        self._sync_task = asyncio.create_task(self._process_queue())
        
        # Keep running until stopped
        while self._running:
            await asyncio.sleep(1)
    
    async def stop_watching(self):
        """Stop watching for file changes."""
        if not self._running:
            return
        
        logger.info("Stopping sync manager")
        self._running = False
        
        await self.watcher.stop()
        
        if self._sync_task:
            self._sync_task.cancel()
            try:
                await self._sync_task
            except asyncio.CancelledError:
                pass
        
        await self.backend.disconnect()
        self.state.save()
    
    async def _on_file_event(self, event: FileEvent):
        """Handle file system event."""
        logger.debug(f"File event: {event.event_type.value} - {event.path}")
        await self._sync_queue.put(event)
    
    async def _process_queue(self):
        """Process sync queue."""
        while self._running:
            try:
                event = await asyncio.wait_for(
                    self._sync_queue.get(),
                    timeout=1.0
                )
                await self._process_event(event)
            except asyncio.TimeoutError:
                continue
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Error processing queue: {e}")
    
    async def _process_event(self, event: FileEvent):
        """Process a single file event."""
        try:
            rel_path = str(event.path.relative_to(self.source_dir))
            
            if event.event_type == EventType.DELETED:
                await self._handle_delete(rel_path)
            elif event.event_type in (EventType.CREATED, EventType.MODIFIED):
                await self._handle_upload(event.path, rel_path)
            elif event.event_type == EventType.MOVED:
                await self._handle_move(rel_path, str(event.dest_path.relative_to(self.source_dir)))
            
            self.state.save()
        except Exception as e:
            logger.error(f"Error processing event {event}: {e}")
    
    async def _handle_upload(self, local_path: Path, rel_path: str):
        """Handle file upload."""
        if not local_path.exists():
            return
        
        logger.info(f"Uploading: {rel_path}")
        
        # Prepare file for upload
        upload_path = local_path
        temp_files = []
        
        try:
            # Compress if enabled
            if self.compression:
                compressed_path = compress_file(local_path)
                temp_files.append(compressed_path)
                upload_path = compressed_path
            
            # Encrypt if enabled
            if self.encryption and self.encryption_key:
                encrypted_path = encrypt_file(upload_path, self.encryption_key)
                temp_files.append(encrypted_path)
                upload_path = encrypted_path
            
            # Upload to backend
            remote_path = rel_path
            if self.compression:
                remote_path += ".gz"
            if self.encryption:
                remote_path += ".enc"
            
            await self.backend.upload(upload_path, remote_path)
            
            # Update state
            file_state = self.state.create_file_state(local_path)
            file_state.synced_time = datetime.utcnow().isoformat()
            self.state.update_file_state(file_state)
            
            logger.info(f"Uploaded: {rel_path}")
            
        finally:
            # Cleanup temp files
            for temp_path in temp_files:
                try:
                    temp_path.unlink()
                except:
                    pass
    
    async def _handle_delete(self, rel_path: str):
        """Handle file deletion."""
        logger.info(f"Deleting from remote: {rel_path}")
        
        remote_path = rel_path
        if self.compression:
            remote_path += ".gz"
        if self.encryption:
            remote_path += ".enc"
        
        await self.backend.delete(remote_path)
        self.state.remove_file_state(rel_path)
        
        logger.info(f"Deleted: {rel_path}")
    
    async def _handle_move(self, old_path: str, new_path: str):
        """Handle file move/rename."""
        logger.info(f"Moving: {old_path} -> {new_path}")
        
        # Delete old and upload new
        await self._handle_delete(old_path)
        
        new_local_path = self.source_dir / new_path
        if new_local_path.exists():
            await self._handle_upload(new_local_path, new_path)
    
    async def sync_all(self, force: bool = False) -> Dict[str, int]:
        """Sync all files."""
        logger.info("Starting full sync")
        
        result = {
            "uploaded": 0,
            "skipped": 0,
            "deleted": 0,
            "errors": 0
        }
        
        await self.backend.connect()
        
        try:
            # Get all local files
            all_files = self.watcher.get_all_files()
            
            # Find changed files
            if force:
                changed_files = all_files
            else:
                changed_files = self.state.get_changed_files(all_files)
            
            # Upload changed files
            for file_path in changed_files:
                try:
                    rel_path = str(file_path.relative_to(self.source_dir))
                    await self._handle_upload(file_path, rel_path)
                    result["uploaded"] += 1
                except Exception as e:
                    logger.error(f"Error uploading {file_path}: {e}")
                    result["errors"] += 1
            
            result["skipped"] = len(all_files) - len(changed_files)
            
            # Handle deleted files
            deleted_files = self.state.get_deleted_files(all_files)
            for rel_path in deleted_files:
                try:
                    await self._handle_delete(rel_path)
                    result["deleted"] += 1
                except Exception as e:
                    logger.error(f"Error deleting {rel_path}: {e}")
                    result["errors"] += 1
            
            # Update last sync time
            self.state.set_last_sync()
            self.state.save()
            
        finally:
            await self.backend.disconnect()
        
        logger.info(f"Sync complete: {result}")
        return result
    
    async def get_pending_changes(self) -> List[Dict[str, Any]]:
        """Get list of pending changes."""
        all_files = self.watcher.get_all_files()
        changed_files = self.state.get_changed_files(all_files)
        deleted_files = self.state.get_deleted_files(all_files)
        
        changes = []
        
        for file_path in changed_files:
            rel_path = str(file_path.relative_to(self.source_dir))
            state = self.state.get_file_state(rel_path)
            
            changes.append({
                "action": "upload" if state else "create",
                "path": rel_path,
                "size": self._format_size(file_path.stat().st_size)
            })
        
        for rel_path in deleted_files:
            changes.append({
                "action": "delete",
                "path": rel_path
            })
        
        return changes
    
    async def get_status(self) -> Dict[str, Any]:
        """Get sync status."""
        all_files = self.watcher.get_all_files()
        pending = await self.get_pending_changes()
        
        return {
            "last_sync": self.state.get_last_sync(),
            "files_tracked": len(self.state.get_all_files()),
            "pending_changes": len(pending),
            "conflicts": self.state.get_conflicts()
        }
    
    async def list_tracked_files(self) -> List[Dict[str, Any]]:
        """List all tracked files."""
        files = []
        
        for file_state in self.state.get_all_files():
            local_path = self.source_dir / file_state.path
            
            files.append({
                "path": file_state.path,
                "size": self._format_size(file_state.size),
                "modified": datetime.fromtimestamp(file_state.modified_time).isoformat(),
                "status": "synced" if file_state.synced_time else "pending"
            })
        
        return files
    
    async def restore_file(self, rel_path: str, date: Optional[str] = None) -> Dict[str, Any]:
        """Restore a file from backup."""
        try:
            await self.backend.connect()
            
            remote_path = rel_path
            if self.compression:
                remote_path += ".gz"
            if self.encryption:
                remote_path += ".enc"
            
            local_path = self.source_dir / rel_path
            download_path = local_path.with_suffix(local_path.suffix + ".tmp")
            
            # Download from backend
            await self.backend.download(remote_path, download_path)
            
            # Decrypt if needed
            if self.encryption and self.encryption_key:
                decrypted = decrypt_file(download_path, self.encryption_key)
                download_path.unlink()
                download_path = decrypted
            
            # Decompress if needed
            if self.compression:
                decompressed = decompress_file(download_path)
                download_path.unlink()
                download_path = decompressed
            
            # Move to final location
            download_path.rename(local_path)
            
            return {"success": True}
            
        except Exception as e:
            return {"success": False, "error": str(e)}
        finally:
            await self.backend.disconnect()
    
    @staticmethod
    def _format_size(size: int) -> str:
        """Format file size for display."""
        size_float = float(size)
        for unit in ["B", "KB", "MB", "GB"]:
            if size_float < 1024:
                return f"{size_float:.1f} {unit}"
            size_float /= 1024
        return f"{size_float:.1f} TB"
PreviousNext