PYTHON
__main__.py🐍python
"""
Entry point for the file sync tool.
"""
import asyncio
import click
from rich.console import Console
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, TextColumn
from .config import settings
from .sync_manager import SyncManager
from .backends import LocalBackend, S3Backend
console = Console()
def get_backend():
"""Create the appropriate backend based on configuration."""
if settings.BACKEND_TYPE == "s3":
return S3Backend(
bucket=settings.S3_BUCKET,
region=settings.AWS_REGION,
prefix=settings.S3_PREFIX,
access_key=settings.AWS_ACCESS_KEY_ID,
secret_key=settings.AWS_SECRET_ACCESS_KEY
)
else:
return LocalBackend(backup_dir=settings.BACKUP_DIR)
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""File Sync Tool - Synchronize files to cloud storage."""
pass
@cli.command()
@click.option("--dry-run", is_flag=True, help="Show what would be synced without syncing")
@click.option("--force", is_flag=True, help="Force re-sync all files")
@click.option("--path", help="Sync specific directory or file")
def sync(dry_run: bool, force: bool, path: str):
"""Perform a sync operation."""
async def run_sync():
backend = get_backend()
manager = SyncManager(
source_dir=path or settings.WATCH_DIR,
backend=backend,
compression=settings.COMPRESSION_ENABLED,
encryption=settings.ENCRYPTION_ENABLED,
encryption_key=settings.ENCRYPTION_KEY
)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console
) as progress:
task = progress.add_task("Syncing files...", total=None)
if dry_run:
changes = await manager.get_pending_changes()
progress.remove_task(task)
if not changes:
console.print("[green]No changes to sync[/green]")
return
table = Table(title="Pending Changes (Dry Run)")
table.add_column("Action", style="cyan")
table.add_column("File", style="white")
table.add_column("Size", style="dim")
for change in changes:
table.add_row(
change["action"],
change["path"],
change.get("size", "-")
)
console.print(table)
else:
result = await manager.sync_all(force=force)
progress.remove_task(task)
console.print(f"\n[green]✓ Sync complete![/green]")
console.print(f" Uploaded: {result['uploaded']}")
console.print(f" Skipped: {result['skipped']}")
console.print(f" Errors: {result['errors']}")
asyncio.run(run_sync())
@cli.command()
def watch():
"""Start watching for file changes and sync automatically."""
console.print(f"[cyan]Starting file watcher...[/cyan]")
console.print(f"Watching: {settings.WATCH_DIR}")
console.print(f"Backend: {settings.BACKEND_TYPE}")
console.print("Press Ctrl+C to stop\n")
async def run_watch():
backend = get_backend()
manager = SyncManager(
source_dir=settings.WATCH_DIR,
backend=backend,
compression=settings.COMPRESSION_ENABLED,
encryption=settings.ENCRYPTION_ENABLED,
encryption_key=settings.ENCRYPTION_KEY
)
try:
await manager.start_watching()
except KeyboardInterrupt:
await manager.stop_watching()
console.print("\n[yellow]Stopped watching[/yellow]")
asyncio.run(run_watch())
@cli.command()
def status():
"""Show sync status and statistics."""
async def show_status():
backend = get_backend()
manager = SyncManager(
source_dir=settings.WATCH_DIR,
backend=backend
)
status = await manager.get_status()
console.print("\n[bold cyan]Sync Status[/bold cyan]")
console.print(f"Watch Directory: {settings.WATCH_DIR}")
console.print(f"Backend: {settings.BACKEND_TYPE}")
console.print(f"Last Sync: {status.get('last_sync', 'Never')}")
console.print(f"Files Tracked: {status.get('files_tracked', 0)}")
console.print(f"Pending Changes: {status.get('pending_changes', 0)}")
if status.get("conflicts"):
console.print(f"\n[yellow]Conflicts: {len(status['conflicts'])}[/yellow]")
for conflict in status["conflicts"][:5]:
console.print(f" - {conflict}")
asyncio.run(show_status())
@cli.command("list")
def list_files():
"""List all tracked files."""
async def show_files():
backend = get_backend()
manager = SyncManager(
source_dir=settings.WATCH_DIR,
backend=backend
)
files = await manager.list_tracked_files()
table = Table(title="Tracked Files")
table.add_column("File", style="white")
table.add_column("Size", style="cyan")
table.add_column("Last Modified", style="dim")
table.add_column("Status", style="green")
for file in files[:50]:
table.add_row(
file["path"],
file["size"],
file["modified"],
file["status"]
)
console.print(table)
if len(files) > 50:
console.print(f"\n... and {len(files) - 50} more files")
asyncio.run(show_files())
@cli.command()
@click.option("--date", help="Restore from specific date (YYYY-MM-DD)")
@click.option("--file", "file_path", help="Restore specific file")
def restore(date: str, file_path: str):
"""Restore files from backup."""
async def run_restore():
backend = get_backend()
manager = SyncManager(
source_dir=settings.WATCH_DIR,
backend=backend
)
if file_path:
console.print(f"Restoring: {file_path}")
result = await manager.restore_file(file_path, date=date)
if result["success"]:
console.print(f"[green]✓ Restored {file_path}[/green]")
else:
console.print(f"[red]✗ Failed: {result['error']}[/red]")
else:
console.print("[yellow]Please specify a file to restore with --file[/yellow]")
asyncio.run(run_restore())
def main():
"""Main entry point."""
cli()
if __name__ == "__main__":
main()