PYTHONPython

test sync

real world projects / file sync / tests

PYTHON
test_sync.py🐍
"""
Tests for file sync components.
"""

import pytest
import asyncio
from pathlib import Path
from datetime import datetime, timedelta
import sys
import os
import tempfile

# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))


class TestHashing:
    """Test hashing utilities."""
    
    def test_calculate_md5_hash(self, sample_file):
        """Test MD5 hash calculation."""
        from sync.utils.hashing import calculate_hash
        
        hash_value = calculate_hash(sample_file, "md5")
        assert len(hash_value) == 32  # MD5 produces 32 hex chars
        assert hash_value.isalnum()
    
    def test_calculate_sha256_hash(self, sample_file):
        """Test SHA256 hash calculation."""
        from sync.utils.hashing import calculate_hash
        
        hash_value = calculate_hash(sample_file, "sha256")
        assert len(hash_value) == 64  # SHA256 produces 64 hex chars
    
    def test_verify_hash(self, sample_file):
        """Test hash verification."""
        from sync.utils.hashing import calculate_hash, verify_hash
        
        hash_value = calculate_hash(sample_file, "md5")
        assert verify_hash(sample_file, hash_value, "md5")
        assert not verify_hash(sample_file, "invalid_hash", "md5")
    
    def test_same_content_same_hash(self, tmp_path):
        """Test that same content produces same hash."""
        from sync.utils.hashing import calculate_hash
        
        file1 = tmp_path / "file1.txt"
        file2 = tmp_path / "file2.txt"
        
        content = "Identical content"
        file1.write_text(content)
        file2.write_text(content)
        
        assert calculate_hash(file1) == calculate_hash(file2)


class TestCompression:
    """Test compression utilities."""
    
    def test_compress_decompress(self, sample_file, tmp_path):
        """Test compression and decompression roundtrip."""
        from sync.utils.compression import compress_file, decompress_file
        
        compressed = compress_file(sample_file)
        assert compressed.exists()
        assert compressed.suffix == ".gz"
        
        decompressed = decompress_file(compressed)
        assert decompressed.exists()
        
        assert decompressed.read_text() == sample_file.read_text()
    
    def test_compression_reduces_size(self, tmp_path):
        """Test that compression reduces file size for compressible content."""
        from sync.utils.compression import compress_file
        
        # Create a file with repetitive content (highly compressible)
        large_file = tmp_path / "large.txt"
        large_file.write_text("A" * 10000)
        
        compressed = compress_file(large_file)
        
        assert compressed.stat().st_size < large_file.stat().st_size
    
    def test_custom_output_path(self, sample_file, tmp_path):
        """Test compression with custom output path."""
        from sync.utils.compression import compress_file
        
        output_path = tmp_path / "custom_name.gz"
        result = compress_file(sample_file, output_path)
        
        assert result == output_path
        assert output_path.exists()


class TestEncryption:
    """Test encryption utilities."""
    
    def test_encrypt_decrypt(self, sample_file, tmp_path):
        """Test encryption and decryption roundtrip."""
        from sync.utils.encryption import encrypt_file, decrypt_file
        
        key = "test_encryption_key_32_bytes!!"
        
        encrypted = encrypt_file(sample_file, key)
        assert encrypted.exists()
        assert encrypted.suffix == ".enc"
        
        decrypted = decrypt_file(encrypted, key)
        assert decrypted.exists()
        
        assert decrypted.read_text() == sample_file.read_text()
    
    def test_encrypted_differs_from_original(self, sample_file):
        """Test that encrypted content is different."""
        from sync.utils.encryption import encrypt_file
        
        key = "test_key_32_bytes_long_padding!"
        
        encrypted = encrypt_file(sample_file, key)
        
        assert encrypted.read_bytes() != sample_file.read_bytes()
    
    def test_generate_key(self):
        """Test key generation."""
        from sync.utils.encryption import generate_key
        
        key1 = generate_key()
        key2 = generate_key()
        
        assert len(key1) > 0
        assert key1 != key2  # Keys should be unique
    
    def test_wrong_key_fails(self, sample_file):
        """Test that decryption with wrong key fails."""
        from sync.utils.encryption import encrypt_file, decrypt_file
        
        key1 = "correct_key_32_bytes_padding!!"
        key2 = "wrong_key_32_bytes_padding_!!"
        
        encrypted = encrypt_file(sample_file, key1)
        
        with pytest.raises(Exception):
            decrypt_file(encrypted, key2)


class TestLocalBackend:
    """Test local storage backend."""
    
    @pytest.mark.asyncio
    async def test_connect_creates_directory(self, tmp_path):
        """Test that connect creates backup directory."""
        from sync.backends.local import LocalBackend
        
        backup_dir = tmp_path / "new_backup"
        backend = LocalBackend(str(backup_dir))
        
        await backend.connect()
        
        assert backup_dir.exists()
        
        await backend.disconnect()
    
    @pytest.mark.asyncio
    async def test_upload_download(self, sample_file, temp_backup_dir, tmp_path):
        """Test file upload and download."""
        from sync.backends.local import LocalBackend
        
        backend = LocalBackend(str(temp_backup_dir))
        await backend.connect()
        
        # Upload
        await backend.upload(sample_file, "test/file.txt")
        
        # Verify file exists in backup
        assert (temp_backup_dir / "test" / "file.txt").exists()
        
        # Download
        download_path = tmp_path / "downloaded.txt"
        await backend.download("test/file.txt", download_path)
        
        assert download_path.read_text() == sample_file.read_text()
        
        await backend.disconnect()
    
    @pytest.mark.asyncio
    async def test_delete(self, sample_file, temp_backup_dir):
        """Test file deletion."""
        from sync.backends.local import LocalBackend
        
        backend = LocalBackend(str(temp_backup_dir))
        await backend.connect()
        
        await backend.upload(sample_file, "to_delete.txt")
        assert await backend.exists("to_delete.txt")
        
        await backend.delete("to_delete.txt")
        assert not await backend.exists("to_delete.txt")
        
        await backend.disconnect()
    
    @pytest.mark.asyncio
    async def test_list_files(self, temp_source_dir, temp_backup_dir):
        """Test listing files."""
        from sync.backends.local import LocalBackend
        
        backend = LocalBackend(str(temp_backup_dir))
        await backend.connect()
        
        # Upload some files
        await backend.upload(temp_source_dir / "file1.txt", "file1.txt")
        await backend.upload(temp_source_dir / "file2.txt", "subdir/file2.txt")
        
        files = await backend.list_files()
        
        assert len(files) == 2
        paths = [f.path for f in files]
        assert "file1.txt" in paths
        assert "subdir/file2.txt" in paths
        
        await backend.disconnect()


class TestSyncState:
    """Test sync state management."""
    
    def test_save_load_state(self, temp_source_dir):
        """Test saving and loading state."""
        from sync.state import SyncState, FileState
        
        state = SyncState(str(temp_source_dir))
        
        file_state = FileState(
            path="test.txt",
            checksum="abc123",
            size=100,
            modified_time=1234567890.0
        )
        
        state.update_file_state(file_state)
        state.save()
        
        # Load in new instance
        state2 = SyncState(str(temp_source_dir))
        
        loaded = state2.get_file_state("test.txt")
        assert loaded is not None
        assert loaded.checksum == "abc123"
    
    def test_calculate_checksum(self, sample_file, temp_source_dir):
        """Test checksum calculation."""
        from sync.state import SyncState
        
        checksum = SyncState.calculate_checksum(sample_file)
        
        assert len(checksum) == 32  # MD5
        assert checksum.isalnum()
    
    def test_has_file_changed(self, temp_source_dir):
        """Test change detection."""
        from sync.state import SyncState
        
        state = SyncState(str(temp_source_dir))
        
        file_path = temp_source_dir / "file1.txt"
        
        # Initially should detect as changed (not in state)
        assert state.has_file_changed(file_path)
        
        # Add to state
        file_state = state.create_file_state(file_path)
        state.update_file_state(file_state)
        
        # Should not detect as changed
        assert not state.has_file_changed(file_path)
        
        # Modify file
        file_path.write_text("Modified content!")
        
        # Should detect change
        assert state.has_file_changed(file_path)
    
    def test_conflicts(self, temp_source_dir):
        """Test conflict management."""
        from sync.state import SyncState
        
        state = SyncState(str(temp_source_dir))
        
        state.add_conflict("file1.txt")
        state.add_conflict("file2.txt")
        
        conflicts = state.get_conflicts()
        assert len(conflicts) == 2
        
        state.remove_conflict("file1.txt")
        conflicts = state.get_conflicts()
        assert len(conflicts) == 1


class TestFileWatcher:
    """Test file system watcher."""
    
    def test_ignore_patterns(self, temp_source_dir):
        """Test that ignore patterns work."""
        from sync.watcher import EventHandler
        
        handler = EventHandler(
            callback=lambda x: None,
            ignore_patterns=[".git", "*.pyc", "__pycache__"]
        )
        
        assert handler.should_ignore(".git/config")
        assert handler.should_ignore("module.pyc")
        assert handler.should_ignore("__pycache__/module.pyc")
        assert not handler.should_ignore("normal_file.txt")
    
    def test_get_all_files(self, temp_source_dir):
        """Test getting all files."""
        from sync.watcher import FileWatcher
        
        watcher = FileWatcher(
            watch_dir=str(temp_source_dir),
            callback=lambda x: None,
            ignore_patterns=[]
        )
        
        files = watcher.get_all_files()
        
        assert len(files) == 3
        names = [f.name for f in files]
        assert "file1.txt" in names
        assert "file2.txt" in names
        assert "nested.txt" in names
    
    def test_ignore_files(self, temp_source_dir):
        """Test file ignoring."""
        from sync.watcher import FileWatcher
        
        # Create a file to ignore
        (temp_source_dir / "ignored.pyc").write_text("bytecode")
        
        watcher = FileWatcher(
            watch_dir=str(temp_source_dir),
            callback=lambda x: None,
            ignore_patterns=["*.pyc"]
        )
        
        files = watcher.get_all_files()
        names = [f.name for f in files]
        
        assert "ignored.pyc" not in names


class TestIntegration:
    """Integration tests for full sync workflow."""
    
    @pytest.mark.asyncio
    async def test_full_sync_workflow(self, temp_source_dir, temp_backup_dir):
        """Test complete sync workflow."""
        from sync.sync_manager import SyncManager
        from sync.backends.local import LocalBackend
        
        backend = LocalBackend(str(temp_backup_dir))
        manager = SyncManager(
            source_dir=str(temp_source_dir),
            backend=backend
        )
        
        # Initial sync
        result = await manager.sync_all()
        
        assert result["uploaded"] == 3
        assert result["errors"] == 0
        
        # Verify files in backup
        assert (temp_backup_dir / "file1.txt").exists()
        assert (temp_backup_dir / "subdir" / "nested.txt").exists()
    
    @pytest.mark.asyncio
    async def test_incremental_sync(self, temp_source_dir, temp_backup_dir):
        """Test incremental sync only uploads changed files."""
        from sync.sync_manager import SyncManager
        from sync.backends.local import LocalBackend
        
        backend = LocalBackend(str(temp_backup_dir))
        manager = SyncManager(
            source_dir=str(temp_source_dir),
            backend=backend
        )
        
        # Initial sync
        await manager.sync_all()
        
        # Second sync without changes
        result = await manager.sync_all()
        
        assert result["uploaded"] == 0
        assert result["skipped"] == 3
    
    @pytest.mark.asyncio
    async def test_sync_with_compression(self, temp_source_dir, temp_backup_dir):
        """Test sync with compression enabled."""
        from sync.sync_manager import SyncManager
        from sync.backends.local import LocalBackend
        
        backend = LocalBackend(str(temp_backup_dir))
        manager = SyncManager(
            source_dir=str(temp_source_dir),
            backend=backend,
            compression=True
        )
        
        await manager.sync_all()
        
        # Check compressed files exist
        assert (temp_backup_dir / "file1.txt.gz").exists()


if __name__ == "__main__":
    pytest.main([__file__, "-v"])
PreviousNext