PYTHON
manager.py🐍python
"""
Main password manager class.
"""
import time
from pathlib import Path
from typing import Optional, List
import logging
import threading
from .models import Vault, PasswordEntry
from .storage import VaultStorage, DEFAULT_VAULT_PATH
from .generator import generate_password, generate_passphrase, calculate_password_strength
from .crypto import SecureString
logger = logging.getLogger(__name__)
class PasswordManager:
"""Main password manager interface."""
def __init__(
self,
vault_path: Optional[Path] = None,
auto_lock_timeout: int = 300 # 5 minutes
):
self.storage = VaultStorage(vault_path or DEFAULT_VAULT_PATH)
self.auto_lock_timeout = auto_lock_timeout
self._vault: Optional[Vault] = None
self._master_password: Optional[SecureString] = None
self._unlocked = False
self._last_activity = 0.0
self._lock_timer: Optional[threading.Timer] = None
@property
def is_initialized(self) -> bool:
"""Check if vault exists."""
return self.storage.exists()
@property
def is_unlocked(self) -> bool:
"""Check if vault is unlocked."""
if not self._unlocked:
return False
# Check auto-lock timeout
if time.time() - self._last_activity > self.auto_lock_timeout:
self.lock()
return False
return True
def _update_activity(self):
"""Update last activity timestamp."""
self._last_activity = time.time()
def _require_unlocked(self):
"""Ensure vault is unlocked."""
if not self.is_unlocked:
raise RuntimeError("Vault is locked. Call unlock() first.")
self._update_activity()
def initialize(self, master_password: str):
"""
Initialize a new vault.
Args:
master_password: Master password for the vault
Raises:
ValueError: If vault already exists
"""
if self.is_initialized:
raise ValueError("Vault already exists. Delete it first or use a different path.")
# Validate password strength
strength = calculate_password_strength(master_password)
if strength["score"] < 40:
logger.warning(f"Master password is weak: {strength['level']}")
# Create empty vault
self._vault = Vault()
self._master_password = SecureString(master_password)
# Save to file
self.storage.create(self._vault, master_password)
self._unlocked = True
self._update_activity()
logger.info("Vault initialized")
def unlock(self, master_password: str):
"""
Unlock the vault.
Args:
master_password: Master password
Raises:
FileNotFoundError: If vault doesn't exist
ValueError: If password is incorrect
"""
if not self.is_initialized:
raise FileNotFoundError("Vault not found. Call initialize() first.")
# Load vault
self._vault = self.storage.load(master_password)
self._master_password = SecureString(master_password)
self._unlocked = True
self._update_activity()
logger.info("Vault unlocked")
def lock(self):
"""Lock the vault and clear sensitive data."""
if self._vault:
# Clear passwords from memory
for entry in self._vault.entries:
entry.password = ""
self._vault = None
if self._master_password:
self._master_password.clear()
self._master_password = None
self._unlocked = False
if self._lock_timer:
self._lock_timer.cancel()
self._lock_timer = None
logger.info("Vault locked")
def save(self):
"""Save vault to file."""
self._require_unlocked()
self.storage.save(self._vault, self._master_password.get())
logger.debug("Vault saved")
def add(
self,
name: str,
username: str = "",
password: str = "",
url: str = "",
notes: str = "",
category: str = "General",
tags: Optional[List[str]] = None
) -> PasswordEntry:
"""
Add a new password entry.
Args:
name: Entry name (unique identifier)
username: Username/email
password: Password (can be auto-generated)
url: Website URL
notes: Additional notes
category: Category name
tags: List of tags
Returns:
Created PasswordEntry
"""
self._require_unlocked()
entry = PasswordEntry(
name=name,
username=username,
password=password,
url=url,
notes=notes,
category=category,
tags=tags or []
)
self._vault.add(entry)
self.save()
logger.info(f"Added entry: {name}")
return entry
def get(self, name: str) -> Optional[PasswordEntry]:
"""
Get a password entry by name.
Args:
name: Entry name
Returns:
PasswordEntry or None if not found
"""
self._require_unlocked()
return self._vault.get(name)
def update(self, name: str, **kwargs) -> bool:
"""
Update a password entry.
Args:
name: Entry name
**kwargs: Fields to update
Returns:
True if successful
"""
self._require_unlocked()
result = self._vault.update(name, **kwargs)
if result:
self.save()
logger.info(f"Updated entry: {name}")
return result
def delete(self, name: str) -> bool:
"""
Delete a password entry.
Args:
name: Entry name
Returns:
True if successful
"""
self._require_unlocked()
result = self._vault.delete(name)
if result:
self.save()
logger.info(f"Deleted entry: {name}")
return result
def list_all(self) -> List[PasswordEntry]:
"""Get all entries."""
self._require_unlocked()
return self._vault.entries.copy()
def search(
self,
query: str = "",
category: str = "",
tags: Optional[List[str]] = None
) -> List[PasswordEntry]:
"""
Search entries.
Args:
query: Search text
category: Filter by category
tags: Filter by tags
Returns:
Matching entries
"""
self._require_unlocked()
return self._vault.search(query, category, tags)
def generate_password(
self,
length: int = 16,
lowercase: bool = True,
uppercase: bool = True,
digits: bool = True,
symbols: bool = True
) -> str:
"""Generate a random password."""
return generate_password(
length=length,
lowercase=lowercase,
uppercase=uppercase,
digits=digits,
symbols=symbols
)
def generate_passphrase(
self,
words: int = 4,
separator: str = "-",
capitalize: bool = False
) -> str:
"""Generate a random passphrase."""
return generate_passphrase(
words=words,
separator=separator,
capitalize=capitalize
)
def check_strength(self, password: str) -> dict:
"""Check password strength."""
return calculate_password_strength(password)
def change_master_password(self, new_password: str):
"""
Change the master password.
Args:
new_password: New master password
"""
self._require_unlocked()
# Check strength
strength = calculate_password_strength(new_password)
if strength["score"] < 40:
logger.warning(f"New master password is weak: {strength['level']}")
# Re-encrypt with new password
self.storage.change_password(self._vault, new_password)
# Update stored password
self._master_password.clear()
self._master_password = SecureString(new_password)
logger.info("Master password changed")
def audit(self) -> dict:
"""
Audit all passwords for security issues.
Returns:
Audit report with issues found
"""
self._require_unlocked()
issues = {
"weak_passwords": [],
"duplicate_passwords": [],
"old_passwords": [],
"missing_passwords": []
}
# Check each entry
password_map = {}
for entry in self._vault.entries:
if not entry.password:
issues["missing_passwords"].append(entry.name)
continue
# Check strength
strength = calculate_password_strength(entry.password)
if strength["score"] < 60:
issues["weak_passwords"].append({
"name": entry.name,
"score": strength["score"],
"level": strength["level"]
})
# Check duplicates
if entry.password in password_map:
issues["duplicate_passwords"].append({
"entries": [password_map[entry.password], entry.name]
})
else:
password_map[entry.password] = entry.name
return issues
def export_vault(self, path: Path, encrypted: bool = True,
password: Optional[str] = None):
"""Export vault to file."""
self._require_unlocked()
self.storage.export_json(
self._vault,
path,
encrypted,
password or self._master_password.get()
)
def import_vault(self, path: Path, password: Optional[str] = None,
merge: bool = False):
"""
Import vault from file.
Args:
path: Import file path
password: Password for encrypted imports
merge: If True, merge with existing entries
"""
self._require_unlocked()
imported = self.storage.import_json(path, password)
if merge:
# Merge entries
for entry in imported.entries:
if not self._vault.get(entry.name):
self._vault.entries.append(entry)
else:
# Replace all entries
self._vault.entries = imported.entries
self.save()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock()