PYTHONPython

manager

real world projects / password manager / pwm

PYTHON
manager.py🐍
"""
Main password manager class.
"""

import time
from pathlib import Path
from typing import Optional, List
import logging
import threading

from .models import Vault, PasswordEntry
from .storage import VaultStorage, DEFAULT_VAULT_PATH
from .generator import generate_password, generate_passphrase, calculate_password_strength
from .crypto import SecureString

logger = logging.getLogger(__name__)


class PasswordManager:
    """Main password manager interface."""
    
    def __init__(
        self,
        vault_path: Optional[Path] = None,
        auto_lock_timeout: int = 300  # 5 minutes
    ):
        self.storage = VaultStorage(vault_path or DEFAULT_VAULT_PATH)
        self.auto_lock_timeout = auto_lock_timeout
        
        self._vault: Optional[Vault] = None
        self._master_password: Optional[SecureString] = None
        self._unlocked = False
        self._last_activity = 0.0
        self._lock_timer: Optional[threading.Timer] = None
    
    @property
    def is_initialized(self) -> bool:
        """Check if vault exists."""
        return self.storage.exists()
    
    @property
    def is_unlocked(self) -> bool:
        """Check if vault is unlocked."""
        if not self._unlocked:
            return False
        
        # Check auto-lock timeout
        if time.time() - self._last_activity > self.auto_lock_timeout:
            self.lock()
            return False
        
        return True
    
    def _update_activity(self):
        """Update last activity timestamp."""
        self._last_activity = time.time()
    
    def _require_unlocked(self):
        """Ensure vault is unlocked."""
        if not self.is_unlocked:
            raise RuntimeError("Vault is locked. Call unlock() first.")
        self._update_activity()
    
    def initialize(self, master_password: str):
        """
        Initialize a new vault.
        
        Args:
            master_password: Master password for the vault
        
        Raises:
            ValueError: If vault already exists
        """
        if self.is_initialized:
            raise ValueError("Vault already exists. Delete it first or use a different path.")
        
        # Validate password strength
        strength = calculate_password_strength(master_password)
        if strength["score"] < 40:
            logger.warning(f"Master password is weak: {strength['level']}")
        
        # Create empty vault
        self._vault = Vault()
        self._master_password = SecureString(master_password)
        
        # Save to file
        self.storage.create(self._vault, master_password)
        
        self._unlocked = True
        self._update_activity()
        
        logger.info("Vault initialized")
    
    def unlock(self, master_password: str):
        """
        Unlock the vault.
        
        Args:
            master_password: Master password
        
        Raises:
            FileNotFoundError: If vault doesn't exist
            ValueError: If password is incorrect
        """
        if not self.is_initialized:
            raise FileNotFoundError("Vault not found. Call initialize() first.")
        
        # Load vault
        self._vault = self.storage.load(master_password)
        self._master_password = SecureString(master_password)
        
        self._unlocked = True
        self._update_activity()
        
        logger.info("Vault unlocked")
    
    def lock(self):
        """Lock the vault and clear sensitive data."""
        if self._vault:
            # Clear passwords from memory
            for entry in self._vault.entries:
                entry.password = ""
            self._vault = None
        
        if self._master_password:
            self._master_password.clear()
            self._master_password = None
        
        self._unlocked = False
        
        if self._lock_timer:
            self._lock_timer.cancel()
            self._lock_timer = None
        
        logger.info("Vault locked")
    
    def save(self):
        """Save vault to file."""
        self._require_unlocked()
        
        self.storage.save(self._vault, self._master_password.get())
        logger.debug("Vault saved")
    
    def add(
        self,
        name: str,
        username: str = "",
        password: str = "",
        url: str = "",
        notes: str = "",
        category: str = "General",
        tags: Optional[List[str]] = None
    ) -> PasswordEntry:
        """
        Add a new password entry.
        
        Args:
            name: Entry name (unique identifier)
            username: Username/email
            password: Password (can be auto-generated)
            url: Website URL
            notes: Additional notes
            category: Category name
            tags: List of tags
        
        Returns:
            Created PasswordEntry
        """
        self._require_unlocked()
        
        entry = PasswordEntry(
            name=name,
            username=username,
            password=password,
            url=url,
            notes=notes,
            category=category,
            tags=tags or []
        )
        
        self._vault.add(entry)
        self.save()
        
        logger.info(f"Added entry: {name}")
        return entry
    
    def get(self, name: str) -> Optional[PasswordEntry]:
        """
        Get a password entry by name.
        
        Args:
            name: Entry name
        
        Returns:
            PasswordEntry or None if not found
        """
        self._require_unlocked()
        return self._vault.get(name)
    
    def update(self, name: str, **kwargs) -> bool:
        """
        Update a password entry.
        
        Args:
            name: Entry name
            **kwargs: Fields to update
        
        Returns:
            True if successful
        """
        self._require_unlocked()
        
        result = self._vault.update(name, **kwargs)
        if result:
            self.save()
            logger.info(f"Updated entry: {name}")
        
        return result
    
    def delete(self, name: str) -> bool:
        """
        Delete a password entry.
        
        Args:
            name: Entry name
        
        Returns:
            True if successful
        """
        self._require_unlocked()
        
        result = self._vault.delete(name)
        if result:
            self.save()
            logger.info(f"Deleted entry: {name}")
        
        return result
    
    def list_all(self) -> List[PasswordEntry]:
        """Get all entries."""
        self._require_unlocked()
        return self._vault.entries.copy()
    
    def search(
        self,
        query: str = "",
        category: str = "",
        tags: Optional[List[str]] = None
    ) -> List[PasswordEntry]:
        """
        Search entries.
        
        Args:
            query: Search text
            category: Filter by category
            tags: Filter by tags
        
        Returns:
            Matching entries
        """
        self._require_unlocked()
        return self._vault.search(query, category, tags)
    
    def generate_password(
        self,
        length: int = 16,
        lowercase: bool = True,
        uppercase: bool = True,
        digits: bool = True,
        symbols: bool = True
    ) -> str:
        """Generate a random password."""
        return generate_password(
            length=length,
            lowercase=lowercase,
            uppercase=uppercase,
            digits=digits,
            symbols=symbols
        )
    
    def generate_passphrase(
        self,
        words: int = 4,
        separator: str = "-",
        capitalize: bool = False
    ) -> str:
        """Generate a random passphrase."""
        return generate_passphrase(
            words=words,
            separator=separator,
            capitalize=capitalize
        )
    
    def check_strength(self, password: str) -> dict:
        """Check password strength."""
        return calculate_password_strength(password)
    
    def change_master_password(self, new_password: str):
        """
        Change the master password.
        
        Args:
            new_password: New master password
        """
        self._require_unlocked()
        
        # Check strength
        strength = calculate_password_strength(new_password)
        if strength["score"] < 40:
            logger.warning(f"New master password is weak: {strength['level']}")
        
        # Re-encrypt with new password
        self.storage.change_password(self._vault, new_password)
        
        # Update stored password
        self._master_password.clear()
        self._master_password = SecureString(new_password)
        
        logger.info("Master password changed")
    
    def audit(self) -> dict:
        """
        Audit all passwords for security issues.
        
        Returns:
            Audit report with issues found
        """
        self._require_unlocked()
        
        issues = {
            "weak_passwords": [],
            "duplicate_passwords": [],
            "old_passwords": [],
            "missing_passwords": []
        }
        
        # Check each entry
        password_map = {}
        
        for entry in self._vault.entries:
            if not entry.password:
                issues["missing_passwords"].append(entry.name)
                continue
            
            # Check strength
            strength = calculate_password_strength(entry.password)
            if strength["score"] < 60:
                issues["weak_passwords"].append({
                    "name": entry.name,
                    "score": strength["score"],
                    "level": strength["level"]
                })
            
            # Check duplicates
            if entry.password in password_map:
                issues["duplicate_passwords"].append({
                    "entries": [password_map[entry.password], entry.name]
                })
            else:
                password_map[entry.password] = entry.name
        
        return issues
    
    def export_vault(self, path: Path, encrypted: bool = True,
                     password: Optional[str] = None):
        """Export vault to file."""
        self._require_unlocked()
        self.storage.export_json(
            self._vault,
            path,
            encrypted,
            password or self._master_password.get()
        )
    
    def import_vault(self, path: Path, password: Optional[str] = None,
                     merge: bool = False):
        """
        Import vault from file.
        
        Args:
            path: Import file path
            password: Password for encrypted imports
            merge: If True, merge with existing entries
        """
        self._require_unlocked()
        
        imported = self.storage.import_json(path, password)
        
        if merge:
            # Merge entries
            for entry in imported.entries:
                if not self._vault.get(entry.name):
                    self._vault.entries.append(entry)
        else:
            # Replace all entries
            self._vault.entries = imported.entries
        
        self.save()
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.lock()
PreviousNext