PYTHONPython

local

real world projects / file sync / sync / backends

PYTHON
local.py🐍
"""
Local filesystem backend for testing and local backup.
"""

import shutil
from pathlib import Path
from typing import List, Optional
from datetime import datetime
import logging
import aiofiles
import aiofiles.os

from .base import BaseBackend, RemoteFile

logger = logging.getLogger(__name__)


class LocalBackend(BaseBackend):
    """Local filesystem backend for backups."""
    
    def __init__(self, backup_dir: str):
        self.backup_dir = Path(backup_dir)
        self._connected = False
    
    async def connect(self):
        """Ensure backup directory exists."""
        self.backup_dir.mkdir(parents=True, exist_ok=True)
        self._connected = True
        logger.info(f"LocalBackend connected: {self.backup_dir}")
    
    async def disconnect(self):
        """Cleanup (nothing to do for local)."""
        self._connected = False
        logger.info("LocalBackend disconnected")
    
    def _get_full_path(self, remote_path: str) -> Path:
        """Get full local path for a remote path."""
        return self.backup_dir / remote_path
    
    async def upload(self, local_path: Path, remote_path: str) -> bool:
        """Copy file to backup directory."""
        try:
            dest_path = self._get_full_path(remote_path)
            dest_path.parent.mkdir(parents=True, exist_ok=True)
            
            # Use async file copy
            async with aiofiles.open(local_path, 'rb') as src:
                async with aiofiles.open(dest_path, 'wb') as dst:
                    while chunk := await src.read(8192):
                        await dst.write(chunk)
            
            logger.debug(f"Uploaded to local: {remote_path}")
            return True
            
        except Exception as e:
            logger.error(f"Error uploading to local {remote_path}: {e}")
            return False
    
    async def download(self, remote_path: str, local_path: Path) -> bool:
        """Copy file from backup directory."""
        try:
            src_path = self._get_full_path(remote_path)
            
            if not src_path.exists():
                logger.error(f"File not found in backup: {remote_path}")
                return False
            
            local_path.parent.mkdir(parents=True, exist_ok=True)
            
            async with aiofiles.open(src_path, 'rb') as src:
                async with aiofiles.open(local_path, 'wb') as dst:
                    while chunk := await src.read(8192):
                        await dst.write(chunk)
            
            logger.debug(f"Downloaded from local: {remote_path}")
            return True
            
        except Exception as e:
            logger.error(f"Error downloading from local {remote_path}: {e}")
            return False
    
    async def delete(self, remote_path: str) -> bool:
        """Delete file from backup directory."""
        try:
            file_path = self._get_full_path(remote_path)
            
            if file_path.exists():
                await aiofiles.os.remove(file_path)
                
                # Cleanup empty parent directories
                parent = file_path.parent
                while parent != self.backup_dir:
                    try:
                        parent.rmdir()  # Only removes if empty
                        parent = parent.parent
                    except OSError:
                        break
                
                logger.debug(f"Deleted from local: {remote_path}")
            
            return True
            
        except Exception as e:
            logger.error(f"Error deleting from local {remote_path}: {e}")
            return False
    
    async def exists(self, remote_path: str) -> bool:
        """Check if file exists in backup."""
        file_path = self._get_full_path(remote_path)
        return file_path.exists()
    
    async def list_files(self, prefix: str = "") -> List[RemoteFile]:
        """List files in backup directory."""
        files = []
        search_path = self._get_full_path(prefix) if prefix else self.backup_dir
        
        if not search_path.exists():
            return files
        
        for path in search_path.rglob("*"):
            if path.is_file():
                rel_path = str(path.relative_to(self.backup_dir))
                stat = path.stat()
                
                files.append(RemoteFile(
                    path=rel_path,
                    size=stat.st_size,
                    modified_time=datetime.fromtimestamp(stat.st_mtime)
                ))
        
        return files
    
    async def get_file_info(self, remote_path: str) -> Optional[RemoteFile]:
        """Get info about a file in backup."""
        file_path = self._get_full_path(remote_path)
        
        if not file_path.exists():
            return None
        
        stat = file_path.stat()
        
        return RemoteFile(
            path=remote_path,
            size=stat.st_size,
            modified_time=datetime.fromtimestamp(stat.st_mtime)
        )
PreviousNext