PYTHON
sync_manager.py🐍python
"""
Main sync manager orchestrating all sync operations.
"""
import asyncio
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional, Any
import logging
from .watcher import FileWatcher, FileEvent, EventType
from .state import SyncState, FileState
from .backends.base import BaseBackend
from .utils.compression import compress_file, decompress_file
from .utils.encryption import encrypt_file, decrypt_file
from .config import settings
logger = logging.getLogger(__name__)
class SyncManager:
"""Manages file synchronization between local and remote."""
def __init__(
self,
source_dir: str,
backend: BaseBackend,
compression: bool = False,
encryption: bool = False,
encryption_key: Optional[str] = None,
conflict_resolution: str = "keep_both"
):
self.source_dir = Path(source_dir)
self.backend = backend
self.compression = compression
self.encryption = encryption
self.encryption_key = encryption_key
self.conflict_resolution = conflict_resolution
self.state = SyncState(source_dir)
self.watcher = FileWatcher(
watch_dir=source_dir,
callback=self._on_file_event,
ignore_patterns=settings.IGNORE_PATTERNS,
debounce_seconds=settings.DEBOUNCE_SECONDS
)
self._sync_queue: asyncio.Queue = asyncio.Queue()
self._sync_task: Optional[asyncio.Task] = None
self._running = False
async def start_watching(self):
"""Start watching for file changes and processing sync queue."""
if self._running:
return
logger.info("Starting sync manager")
self._running = True
# Initialize backend
await self.backend.connect()
# Start file watcher
await self.watcher.start()
# Start sync processor
self._sync_task = asyncio.create_task(self._process_queue())
# Keep running until stopped
while self._running:
await asyncio.sleep(1)
async def stop_watching(self):
"""Stop watching for file changes."""
if not self._running:
return
logger.info("Stopping sync manager")
self._running = False
await self.watcher.stop()
if self._sync_task:
self._sync_task.cancel()
try:
await self._sync_task
except asyncio.CancelledError:
pass
await self.backend.disconnect()
self.state.save()
async def _on_file_event(self, event: FileEvent):
"""Handle file system event."""
logger.debug(f"File event: {event.event_type.value} - {event.path}")
await self._sync_queue.put(event)
async def _process_queue(self):
"""Process sync queue."""
while self._running:
try:
event = await asyncio.wait_for(
self._sync_queue.get(),
timeout=1.0
)
await self._process_event(event)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error processing queue: {e}")
async def _process_event(self, event: FileEvent):
"""Process a single file event."""
try:
rel_path = str(event.path.relative_to(self.source_dir))
if event.event_type == EventType.DELETED:
await self._handle_delete(rel_path)
elif event.event_type in (EventType.CREATED, EventType.MODIFIED):
await self._handle_upload(event.path, rel_path)
elif event.event_type == EventType.MOVED:
await self._handle_move(rel_path, str(event.dest_path.relative_to(self.source_dir)))
self.state.save()
except Exception as e:
logger.error(f"Error processing event {event}: {e}")
async def _handle_upload(self, local_path: Path, rel_path: str):
"""Handle file upload."""
if not local_path.exists():
return
logger.info(f"Uploading: {rel_path}")
# Prepare file for upload
upload_path = local_path
temp_files = []
try:
# Compress if enabled
if self.compression:
compressed_path = compress_file(local_path)
temp_files.append(compressed_path)
upload_path = compressed_path
# Encrypt if enabled
if self.encryption and self.encryption_key:
encrypted_path = encrypt_file(upload_path, self.encryption_key)
temp_files.append(encrypted_path)
upload_path = encrypted_path
# Upload to backend
remote_path = rel_path
if self.compression:
remote_path += ".gz"
if self.encryption:
remote_path += ".enc"
await self.backend.upload(upload_path, remote_path)
# Update state
file_state = self.state.create_file_state(local_path)
file_state.synced_time = datetime.utcnow().isoformat()
self.state.update_file_state(file_state)
logger.info(f"Uploaded: {rel_path}")
finally:
# Cleanup temp files
for temp_path in temp_files:
try:
temp_path.unlink()
except:
pass
async def _handle_delete(self, rel_path: str):
"""Handle file deletion."""
logger.info(f"Deleting from remote: {rel_path}")
remote_path = rel_path
if self.compression:
remote_path += ".gz"
if self.encryption:
remote_path += ".enc"
await self.backend.delete(remote_path)
self.state.remove_file_state(rel_path)
logger.info(f"Deleted: {rel_path}")
async def _handle_move(self, old_path: str, new_path: str):
"""Handle file move/rename."""
logger.info(f"Moving: {old_path} -> {new_path}")
# Delete old and upload new
await self._handle_delete(old_path)
new_local_path = self.source_dir / new_path
if new_local_path.exists():
await self._handle_upload(new_local_path, new_path)
async def sync_all(self, force: bool = False) -> Dict[str, int]:
"""Sync all files."""
logger.info("Starting full sync")
result = {
"uploaded": 0,
"skipped": 0,
"deleted": 0,
"errors": 0
}
await self.backend.connect()
try:
# Get all local files
all_files = self.watcher.get_all_files()
# Find changed files
if force:
changed_files = all_files
else:
changed_files = self.state.get_changed_files(all_files)
# Upload changed files
for file_path in changed_files:
try:
rel_path = str(file_path.relative_to(self.source_dir))
await self._handle_upload(file_path, rel_path)
result["uploaded"] += 1
except Exception as e:
logger.error(f"Error uploading {file_path}: {e}")
result["errors"] += 1
result["skipped"] = len(all_files) - len(changed_files)
# Handle deleted files
deleted_files = self.state.get_deleted_files(all_files)
for rel_path in deleted_files:
try:
await self._handle_delete(rel_path)
result["deleted"] += 1
except Exception as e:
logger.error(f"Error deleting {rel_path}: {e}")
result["errors"] += 1
# Update last sync time
self.state.set_last_sync()
self.state.save()
finally:
await self.backend.disconnect()
logger.info(f"Sync complete: {result}")
return result
async def get_pending_changes(self) -> List[Dict[str, Any]]:
"""Get list of pending changes."""
all_files = self.watcher.get_all_files()
changed_files = self.state.get_changed_files(all_files)
deleted_files = self.state.get_deleted_files(all_files)
changes = []
for file_path in changed_files:
rel_path = str(file_path.relative_to(self.source_dir))
state = self.state.get_file_state(rel_path)
changes.append({
"action": "upload" if state else "create",
"path": rel_path,
"size": self._format_size(file_path.stat().st_size)
})
for rel_path in deleted_files:
changes.append({
"action": "delete",
"path": rel_path
})
return changes
async def get_status(self) -> Dict[str, Any]:
"""Get sync status."""
all_files = self.watcher.get_all_files()
pending = await self.get_pending_changes()
return {
"last_sync": self.state.get_last_sync(),
"files_tracked": len(self.state.get_all_files()),
"pending_changes": len(pending),
"conflicts": self.state.get_conflicts()
}
async def list_tracked_files(self) -> List[Dict[str, Any]]:
"""List all tracked files."""
files = []
for file_state in self.state.get_all_files():
local_path = self.source_dir / file_state.path
files.append({
"path": file_state.path,
"size": self._format_size(file_state.size),
"modified": datetime.fromtimestamp(file_state.modified_time).isoformat(),
"status": "synced" if file_state.synced_time else "pending"
})
return files
async def restore_file(self, rel_path: str, date: Optional[str] = None) -> Dict[str, Any]:
"""Restore a file from backup."""
try:
await self.backend.connect()
remote_path = rel_path
if self.compression:
remote_path += ".gz"
if self.encryption:
remote_path += ".enc"
local_path = self.source_dir / rel_path
download_path = local_path.with_suffix(local_path.suffix + ".tmp")
# Download from backend
await self.backend.download(remote_path, download_path)
# Decrypt if needed
if self.encryption and self.encryption_key:
decrypted = decrypt_file(download_path, self.encryption_key)
download_path.unlink()
download_path = decrypted
# Decompress if needed
if self.compression:
decompressed = decompress_file(download_path)
download_path.unlink()
download_path = decompressed
# Move to final location
download_path.rename(local_path)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
finally:
await self.backend.disconnect()
@staticmethod
def _format_size(size: int) -> str:
"""Format file size for display."""
size_float = float(size)
for unit in ["B", "KB", "MB", "GB"]:
if size_float < 1024:
return f"{size_float:.1f} {unit}"
size_float /= 1024
return f"{size_float:.1f} TB"