PYTHON
storage.py🐍python
"""
Storage operations for the password vault.
"""
import os
import json
from pathlib import Path
from typing import Optional
import logging
from .crypto import (
generate_salt,
derive_key,
encrypt_data,
decrypt_data,
encode_base64,
decode_base64,
ITERATIONS
)
from .models import Vault
logger = logging.getLogger(__name__)
DEFAULT_VAULT_PATH = Path.home() / ".pwm" / "vault.enc"
class VaultStorage:
"""Handles vault file operations."""
def __init__(self, path: Optional[Path] = None):
self.path = path or DEFAULT_VAULT_PATH
self._salt: Optional[bytes] = None
self._iterations = ITERATIONS
def exists(self) -> bool:
"""Check if vault file exists."""
return self.path.exists()
def create(self, vault: Vault, master_password: str):
"""
Create a new encrypted vault file.
Args:
vault: Vault object to save
master_password: Master password for encryption
"""
# Ensure directory exists
self.path.parent.mkdir(parents=True, exist_ok=True)
# Generate new salt
self._salt = generate_salt()
# Derive key from password
key = derive_key(master_password, self._salt, self._iterations)
# Serialize vault to JSON
plaintext = vault.to_json()
# Encrypt
nonce, ciphertext = encrypt_data(plaintext, key)
# Build vault file structure
vault_file = {
"version": "1.0",
"iterations": self._iterations,
"salt": encode_base64(self._salt),
"nonce": encode_base64(nonce),
"data": encode_base64(ciphertext)
}
# Write to file
with open(self.path, 'w') as f:
json.dump(vault_file, f, indent=2)
# Set restrictive permissions (Unix only)
try:
os.chmod(self.path, 0o600)
except:
pass
logger.info(f"Created vault: {self.path}")
def load(self, master_password: str) -> Vault:
"""
Load and decrypt vault from file.
Args:
master_password: Master password for decryption
Returns:
Decrypted Vault object
Raises:
FileNotFoundError: If vault doesn't exist
ValueError: If decryption fails (wrong password)
"""
if not self.exists():
raise FileNotFoundError(f"Vault not found: {self.path}")
# Read vault file
with open(self.path, 'r') as f:
vault_file = json.load(f)
# Extract components
self._iterations = vault_file.get("iterations", ITERATIONS)
self._salt = decode_base64(vault_file["salt"])
nonce = decode_base64(vault_file["nonce"])
ciphertext = decode_base64(vault_file["data"])
# Derive key
key = derive_key(master_password, self._salt, self._iterations)
# Decrypt
try:
plaintext = decrypt_data(nonce, ciphertext, key)
except Exception as e:
raise ValueError("Decryption failed - incorrect password?") from e
# Parse vault
vault = Vault.from_json(plaintext)
logger.info(f"Loaded vault: {self.path} ({len(vault.entries)} entries)")
return vault
def save(self, vault: Vault, master_password: str):
"""
Save vault to file (re-encrypt).
Args:
vault: Vault object to save
master_password: Master password for encryption
"""
if self._salt is None:
# If no salt, treat as new vault
self.create(vault, master_password)
return
# Derive key
key = derive_key(master_password, self._salt, self._iterations)
# Serialize and encrypt
plaintext = vault.to_json()
nonce, ciphertext = encrypt_data(plaintext, key)
# Build vault file
vault_file = {
"version": "1.0",
"iterations": self._iterations,
"salt": encode_base64(self._salt),
"nonce": encode_base64(nonce),
"data": encode_base64(ciphertext)
}
# Write to file
with open(self.path, 'w') as f:
json.dump(vault_file, f, indent=2)
logger.debug(f"Saved vault: {self.path}")
def change_password(self, vault: Vault, new_password: str):
"""
Re-encrypt vault with new password.
Args:
vault: Vault object
new_password: New master password
"""
# Generate new salt
self._salt = generate_salt()
# Save with new password
self.save(vault, new_password)
logger.info("Master password changed")
def delete(self):
"""Delete the vault file."""
if self.exists():
# Overwrite with random data before deleting
size = self.path.stat().st_size
with open(self.path, 'wb') as f:
f.write(os.urandom(size))
self.path.unlink()
logger.info(f"Deleted vault: {self.path}")
def backup(self, backup_path: Path):
"""
Create a backup of the vault file.
Args:
backup_path: Path for backup file
"""
if not self.exists():
raise FileNotFoundError("No vault to backup")
import shutil
shutil.copy2(self.path, backup_path)
logger.info(f"Created backup: {backup_path}")
def export_json(self, vault: Vault, export_path: Path, encrypted: bool = True,
password: Optional[str] = None):
"""
Export vault to JSON file.
Args:
vault: Vault to export
export_path: Export file path
encrypted: Whether to encrypt the export
password: Password for encryption (required if encrypted=True)
"""
if encrypted:
if not password:
raise ValueError("Password required for encrypted export")
salt = generate_salt()
key = derive_key(password, salt, self._iterations)
plaintext = vault.to_json()
nonce, ciphertext = encrypt_data(plaintext, key)
export_data = {
"format": "pwm_encrypted",
"version": "1.0",
"salt": encode_base64(salt),
"nonce": encode_base64(nonce),
"data": encode_base64(ciphertext)
}
else:
export_data = {
"format": "pwm_plaintext",
"version": "1.0",
"vault": vault.to_dict()
}
with open(export_path, 'w') as f:
json.dump(export_data, f, indent=2)
logger.info(f"Exported vault to: {export_path}")
def import_json(self, import_path: Path, password: Optional[str] = None) -> Vault:
"""
Import vault from JSON file.
Args:
import_path: Import file path
password: Password for encrypted imports
Returns:
Imported Vault object
"""
with open(import_path, 'r') as f:
import_data = json.load(f)
format_type = import_data.get("format", "")
if format_type == "pwm_encrypted":
if not password:
raise ValueError("Password required for encrypted import")
salt = decode_base64(import_data["salt"])
nonce = decode_base64(import_data["nonce"])
ciphertext = decode_base64(import_data["data"])
key = derive_key(password, salt, self._iterations)
try:
plaintext = decrypt_data(nonce, ciphertext, key)
except Exception as e:
raise ValueError("Decryption failed - wrong password?") from e
vault = Vault.from_json(plaintext)
elif format_type == "pwm_plaintext":
vault = Vault.from_dict(import_data["vault"])
else:
raise ValueError(f"Unknown import format: {format_type}")
logger.info(f"Imported vault from: {import_path}")
return vault