PYTHONPython

storage

real world projects / password manager / pwm

PYTHON
storage.py🐍
"""
Storage operations for the password vault.
"""

import os
import json
from pathlib import Path
from typing import Optional
import logging

from .crypto import (
    generate_salt,
    derive_key,
    encrypt_data,
    decrypt_data,
    encode_base64,
    decode_base64,
    ITERATIONS
)
from .models import Vault

logger = logging.getLogger(__name__)

DEFAULT_VAULT_PATH = Path.home() / ".pwm" / "vault.enc"


class VaultStorage:
    """Handles vault file operations."""
    
    def __init__(self, path: Optional[Path] = None):
        self.path = path or DEFAULT_VAULT_PATH
        self._salt: Optional[bytes] = None
        self._iterations = ITERATIONS
    
    def exists(self) -> bool:
        """Check if vault file exists."""
        return self.path.exists()
    
    def create(self, vault: Vault, master_password: str):
        """
        Create a new encrypted vault file.
        
        Args:
            vault: Vault object to save
            master_password: Master password for encryption
        """
        # Ensure directory exists
        self.path.parent.mkdir(parents=True, exist_ok=True)
        
        # Generate new salt
        self._salt = generate_salt()
        
        # Derive key from password
        key = derive_key(master_password, self._salt, self._iterations)
        
        # Serialize vault to JSON
        plaintext = vault.to_json()
        
        # Encrypt
        nonce, ciphertext = encrypt_data(plaintext, key)
        
        # Build vault file structure
        vault_file = {
            "version": "1.0",
            "iterations": self._iterations,
            "salt": encode_base64(self._salt),
            "nonce": encode_base64(nonce),
            "data": encode_base64(ciphertext)
        }
        
        # Write to file
        with open(self.path, 'w') as f:
            json.dump(vault_file, f, indent=2)
        
        # Set restrictive permissions (Unix only)
        try:
            os.chmod(self.path, 0o600)
        except:
            pass
        
        logger.info(f"Created vault: {self.path}")
    
    def load(self, master_password: str) -> Vault:
        """
        Load and decrypt vault from file.
        
        Args:
            master_password: Master password for decryption
        
        Returns:
            Decrypted Vault object
        
        Raises:
            FileNotFoundError: If vault doesn't exist
            ValueError: If decryption fails (wrong password)
        """
        if not self.exists():
            raise FileNotFoundError(f"Vault not found: {self.path}")
        
        # Read vault file
        with open(self.path, 'r') as f:
            vault_file = json.load(f)
        
        # Extract components
        self._iterations = vault_file.get("iterations", ITERATIONS)
        self._salt = decode_base64(vault_file["salt"])
        nonce = decode_base64(vault_file["nonce"])
        ciphertext = decode_base64(vault_file["data"])
        
        # Derive key
        key = derive_key(master_password, self._salt, self._iterations)
        
        # Decrypt
        try:
            plaintext = decrypt_data(nonce, ciphertext, key)
        except Exception as e:
            raise ValueError("Decryption failed - incorrect password?") from e
        
        # Parse vault
        vault = Vault.from_json(plaintext)
        
        logger.info(f"Loaded vault: {self.path} ({len(vault.entries)} entries)")
        return vault
    
    def save(self, vault: Vault, master_password: str):
        """
        Save vault to file (re-encrypt).
        
        Args:
            vault: Vault object to save
            master_password: Master password for encryption
        """
        if self._salt is None:
            # If no salt, treat as new vault
            self.create(vault, master_password)
            return
        
        # Derive key
        key = derive_key(master_password, self._salt, self._iterations)
        
        # Serialize and encrypt
        plaintext = vault.to_json()
        nonce, ciphertext = encrypt_data(plaintext, key)
        
        # Build vault file
        vault_file = {
            "version": "1.0",
            "iterations": self._iterations,
            "salt": encode_base64(self._salt),
            "nonce": encode_base64(nonce),
            "data": encode_base64(ciphertext)
        }
        
        # Write to file
        with open(self.path, 'w') as f:
            json.dump(vault_file, f, indent=2)
        
        logger.debug(f"Saved vault: {self.path}")
    
    def change_password(self, vault: Vault, new_password: str):
        """
        Re-encrypt vault with new password.
        
        Args:
            vault: Vault object
            new_password: New master password
        """
        # Generate new salt
        self._salt = generate_salt()
        
        # Save with new password
        self.save(vault, new_password)
        
        logger.info("Master password changed")
    
    def delete(self):
        """Delete the vault file."""
        if self.exists():
            # Overwrite with random data before deleting
            size = self.path.stat().st_size
            with open(self.path, 'wb') as f:
                f.write(os.urandom(size))
            
            self.path.unlink()
            logger.info(f"Deleted vault: {self.path}")
    
    def backup(self, backup_path: Path):
        """
        Create a backup of the vault file.
        
        Args:
            backup_path: Path for backup file
        """
        if not self.exists():
            raise FileNotFoundError("No vault to backup")
        
        import shutil
        shutil.copy2(self.path, backup_path)
        
        logger.info(f"Created backup: {backup_path}")
    
    def export_json(self, vault: Vault, export_path: Path, encrypted: bool = True,
                    password: Optional[str] = None):
        """
        Export vault to JSON file.
        
        Args:
            vault: Vault to export
            export_path: Export file path
            encrypted: Whether to encrypt the export
            password: Password for encryption (required if encrypted=True)
        """
        if encrypted:
            if not password:
                raise ValueError("Password required for encrypted export")
            
            salt = generate_salt()
            key = derive_key(password, salt, self._iterations)
            plaintext = vault.to_json()
            nonce, ciphertext = encrypt_data(plaintext, key)
            
            export_data = {
                "format": "pwm_encrypted",
                "version": "1.0",
                "salt": encode_base64(salt),
                "nonce": encode_base64(nonce),
                "data": encode_base64(ciphertext)
            }
        else:
            export_data = {
                "format": "pwm_plaintext",
                "version": "1.0",
                "vault": vault.to_dict()
            }
        
        with open(export_path, 'w') as f:
            json.dump(export_data, f, indent=2)
        
        logger.info(f"Exported vault to: {export_path}")
    
    def import_json(self, import_path: Path, password: Optional[str] = None) -> Vault:
        """
        Import vault from JSON file.
        
        Args:
            import_path: Import file path
            password: Password for encrypted imports
        
        Returns:
            Imported Vault object
        """
        with open(import_path, 'r') as f:
            import_data = json.load(f)
        
        format_type = import_data.get("format", "")
        
        if format_type == "pwm_encrypted":
            if not password:
                raise ValueError("Password required for encrypted import")
            
            salt = decode_base64(import_data["salt"])
            nonce = decode_base64(import_data["nonce"])
            ciphertext = decode_base64(import_data["data"])
            
            key = derive_key(password, salt, self._iterations)
            
            try:
                plaintext = decrypt_data(nonce, ciphertext, key)
            except Exception as e:
                raise ValueError("Decryption failed - wrong password?") from e
            
            vault = Vault.from_json(plaintext)
            
        elif format_type == "pwm_plaintext":
            vault = Vault.from_dict(import_data["vault"])
            
        else:
            raise ValueError(f"Unknown import format: {format_type}")
        
        logger.info(f"Imported vault from: {import_path}")
        return vault
PreviousNext