PYTHON
test_sync.py🐍python
"""
Tests for file sync components.
"""
import pytest
import asyncio
from pathlib import Path
from datetime import datetime, timedelta
import sys
import os
import tempfile
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class TestHashing:
"""Test hashing utilities."""
def test_calculate_md5_hash(self, sample_file):
"""Test MD5 hash calculation."""
from sync.utils.hashing import calculate_hash
hash_value = calculate_hash(sample_file, "md5")
assert len(hash_value) == 32 # MD5 produces 32 hex chars
assert hash_value.isalnum()
def test_calculate_sha256_hash(self, sample_file):
"""Test SHA256 hash calculation."""
from sync.utils.hashing import calculate_hash
hash_value = calculate_hash(sample_file, "sha256")
assert len(hash_value) == 64 # SHA256 produces 64 hex chars
def test_verify_hash(self, sample_file):
"""Test hash verification."""
from sync.utils.hashing import calculate_hash, verify_hash
hash_value = calculate_hash(sample_file, "md5")
assert verify_hash(sample_file, hash_value, "md5")
assert not verify_hash(sample_file, "invalid_hash", "md5")
def test_same_content_same_hash(self, tmp_path):
"""Test that same content produces same hash."""
from sync.utils.hashing import calculate_hash
file1 = tmp_path / "file1.txt"
file2 = tmp_path / "file2.txt"
content = "Identical content"
file1.write_text(content)
file2.write_text(content)
assert calculate_hash(file1) == calculate_hash(file2)
class TestCompression:
"""Test compression utilities."""
def test_compress_decompress(self, sample_file, tmp_path):
"""Test compression and decompression roundtrip."""
from sync.utils.compression import compress_file, decompress_file
compressed = compress_file(sample_file)
assert compressed.exists()
assert compressed.suffix == ".gz"
decompressed = decompress_file(compressed)
assert decompressed.exists()
assert decompressed.read_text() == sample_file.read_text()
def test_compression_reduces_size(self, tmp_path):
"""Test that compression reduces file size for compressible content."""
from sync.utils.compression import compress_file
# Create a file with repetitive content (highly compressible)
large_file = tmp_path / "large.txt"
large_file.write_text("A" * 10000)
compressed = compress_file(large_file)
assert compressed.stat().st_size < large_file.stat().st_size
def test_custom_output_path(self, sample_file, tmp_path):
"""Test compression with custom output path."""
from sync.utils.compression import compress_file
output_path = tmp_path / "custom_name.gz"
result = compress_file(sample_file, output_path)
assert result == output_path
assert output_path.exists()
class TestEncryption:
"""Test encryption utilities."""
def test_encrypt_decrypt(self, sample_file, tmp_path):
"""Test encryption and decryption roundtrip."""
from sync.utils.encryption import encrypt_file, decrypt_file
key = "test_encryption_key_32_bytes!!"
encrypted = encrypt_file(sample_file, key)
assert encrypted.exists()
assert encrypted.suffix == ".enc"
decrypted = decrypt_file(encrypted, key)
assert decrypted.exists()
assert decrypted.read_text() == sample_file.read_text()
def test_encrypted_differs_from_original(self, sample_file):
"""Test that encrypted content is different."""
from sync.utils.encryption import encrypt_file
key = "test_key_32_bytes_long_padding!"
encrypted = encrypt_file(sample_file, key)
assert encrypted.read_bytes() != sample_file.read_bytes()
def test_generate_key(self):
"""Test key generation."""
from sync.utils.encryption import generate_key
key1 = generate_key()
key2 = generate_key()
assert len(key1) > 0
assert key1 != key2 # Keys should be unique
def test_wrong_key_fails(self, sample_file):
"""Test that decryption with wrong key fails."""
from sync.utils.encryption import encrypt_file, decrypt_file
key1 = "correct_key_32_bytes_padding!!"
key2 = "wrong_key_32_bytes_padding_!!"
encrypted = encrypt_file(sample_file, key1)
with pytest.raises(Exception):
decrypt_file(encrypted, key2)
class TestLocalBackend:
"""Test local storage backend."""
@pytest.mark.asyncio
async def test_connect_creates_directory(self, tmp_path):
"""Test that connect creates backup directory."""
from sync.backends.local import LocalBackend
backup_dir = tmp_path / "new_backup"
backend = LocalBackend(str(backup_dir))
await backend.connect()
assert backup_dir.exists()
await backend.disconnect()
@pytest.mark.asyncio
async def test_upload_download(self, sample_file, temp_backup_dir, tmp_path):
"""Test file upload and download."""
from sync.backends.local import LocalBackend
backend = LocalBackend(str(temp_backup_dir))
await backend.connect()
# Upload
await backend.upload(sample_file, "test/file.txt")
# Verify file exists in backup
assert (temp_backup_dir / "test" / "file.txt").exists()
# Download
download_path = tmp_path / "downloaded.txt"
await backend.download("test/file.txt", download_path)
assert download_path.read_text() == sample_file.read_text()
await backend.disconnect()
@pytest.mark.asyncio
async def test_delete(self, sample_file, temp_backup_dir):
"""Test file deletion."""
from sync.backends.local import LocalBackend
backend = LocalBackend(str(temp_backup_dir))
await backend.connect()
await backend.upload(sample_file, "to_delete.txt")
assert await backend.exists("to_delete.txt")
await backend.delete("to_delete.txt")
assert not await backend.exists("to_delete.txt")
await backend.disconnect()
@pytest.mark.asyncio
async def test_list_files(self, temp_source_dir, temp_backup_dir):
"""Test listing files."""
from sync.backends.local import LocalBackend
backend = LocalBackend(str(temp_backup_dir))
await backend.connect()
# Upload some files
await backend.upload(temp_source_dir / "file1.txt", "file1.txt")
await backend.upload(temp_source_dir / "file2.txt", "subdir/file2.txt")
files = await backend.list_files()
assert len(files) == 2
paths = [f.path for f in files]
assert "file1.txt" in paths
assert "subdir/file2.txt" in paths
await backend.disconnect()
class TestSyncState:
"""Test sync state management."""
def test_save_load_state(self, temp_source_dir):
"""Test saving and loading state."""
from sync.state import SyncState, FileState
state = SyncState(str(temp_source_dir))
file_state = FileState(
path="test.txt",
checksum="abc123",
size=100,
modified_time=1234567890.0
)
state.update_file_state(file_state)
state.save()
# Load in new instance
state2 = SyncState(str(temp_source_dir))
loaded = state2.get_file_state("test.txt")
assert loaded is not None
assert loaded.checksum == "abc123"
def test_calculate_checksum(self, sample_file, temp_source_dir):
"""Test checksum calculation."""
from sync.state import SyncState
checksum = SyncState.calculate_checksum(sample_file)
assert len(checksum) == 32 # MD5
assert checksum.isalnum()
def test_has_file_changed(self, temp_source_dir):
"""Test change detection."""
from sync.state import SyncState
state = SyncState(str(temp_source_dir))
file_path = temp_source_dir / "file1.txt"
# Initially should detect as changed (not in state)
assert state.has_file_changed(file_path)
# Add to state
file_state = state.create_file_state(file_path)
state.update_file_state(file_state)
# Should not detect as changed
assert not state.has_file_changed(file_path)
# Modify file
file_path.write_text("Modified content!")
# Should detect change
assert state.has_file_changed(file_path)
def test_conflicts(self, temp_source_dir):
"""Test conflict management."""
from sync.state import SyncState
state = SyncState(str(temp_source_dir))
state.add_conflict("file1.txt")
state.add_conflict("file2.txt")
conflicts = state.get_conflicts()
assert len(conflicts) == 2
state.remove_conflict("file1.txt")
conflicts = state.get_conflicts()
assert len(conflicts) == 1
class TestFileWatcher:
"""Test file system watcher."""
def test_ignore_patterns(self, temp_source_dir):
"""Test that ignore patterns work."""
from sync.watcher import EventHandler
handler = EventHandler(
callback=lambda x: None,
ignore_patterns=[".git", "*.pyc", "__pycache__"]
)
assert handler.should_ignore(".git/config")
assert handler.should_ignore("module.pyc")
assert handler.should_ignore("__pycache__/module.pyc")
assert not handler.should_ignore("normal_file.txt")
def test_get_all_files(self, temp_source_dir):
"""Test getting all files."""
from sync.watcher import FileWatcher
watcher = FileWatcher(
watch_dir=str(temp_source_dir),
callback=lambda x: None,
ignore_patterns=[]
)
files = watcher.get_all_files()
assert len(files) == 3
names = [f.name for f in files]
assert "file1.txt" in names
assert "file2.txt" in names
assert "nested.txt" in names
def test_ignore_files(self, temp_source_dir):
"""Test file ignoring."""
from sync.watcher import FileWatcher
# Create a file to ignore
(temp_source_dir / "ignored.pyc").write_text("bytecode")
watcher = FileWatcher(
watch_dir=str(temp_source_dir),
callback=lambda x: None,
ignore_patterns=["*.pyc"]
)
files = watcher.get_all_files()
names = [f.name for f in files]
assert "ignored.pyc" not in names
class TestIntegration:
"""Integration tests for full sync workflow."""
@pytest.mark.asyncio
async def test_full_sync_workflow(self, temp_source_dir, temp_backup_dir):
"""Test complete sync workflow."""
from sync.sync_manager import SyncManager
from sync.backends.local import LocalBackend
backend = LocalBackend(str(temp_backup_dir))
manager = SyncManager(
source_dir=str(temp_source_dir),
backend=backend
)
# Initial sync
result = await manager.sync_all()
assert result["uploaded"] == 3
assert result["errors"] == 0
# Verify files in backup
assert (temp_backup_dir / "file1.txt").exists()
assert (temp_backup_dir / "subdir" / "nested.txt").exists()
@pytest.mark.asyncio
async def test_incremental_sync(self, temp_source_dir, temp_backup_dir):
"""Test incremental sync only uploads changed files."""
from sync.sync_manager import SyncManager
from sync.backends.local import LocalBackend
backend = LocalBackend(str(temp_backup_dir))
manager = SyncManager(
source_dir=str(temp_source_dir),
backend=backend
)
# Initial sync
await manager.sync_all()
# Second sync without changes
result = await manager.sync_all()
assert result["uploaded"] == 0
assert result["skipped"] == 3
@pytest.mark.asyncio
async def test_sync_with_compression(self, temp_source_dir, temp_backup_dir):
"""Test sync with compression enabled."""
from sync.sync_manager import SyncManager
from sync.backends.local import LocalBackend
backend = LocalBackend(str(temp_backup_dir))
manager = SyncManager(
source_dir=str(temp_source_dir),
backend=backend,
compression=True
)
await manager.sync_all()
# Check compressed files exist
assert (temp_backup_dir / "file1.txt.gz").exists()
if __name__ == "__main__":
pytest.main([__file__, "-v"])