PYTHON
compression.py🐍python
"""
File compression utilities using gzip.
"""
import gzip
import shutil
from pathlib import Path
from typing import Optional
import logging
logger = logging.getLogger(__name__)
def compress_file(
input_path: Path,
output_path: Optional[Path] = None,
level: int = 6,
chunk_size: int = 8192
) -> Path:
"""
Compress a file using gzip.
Args:
input_path: Path to input file
output_path: Path for compressed file (default: input + .gz)
level: Compression level (1-9)
chunk_size: Size of chunks to read
Returns:
Path to compressed file
"""
if output_path is None:
output_path = input_path.with_suffix(input_path.suffix + ".gz")
with open(input_path, 'rb') as f_in:
with gzip.open(output_path, 'wb', compresslevel=level) as f_out:
while chunk := f_in.read(chunk_size):
f_out.write(chunk)
logger.debug(f"Compressed: {input_path} -> {output_path}")
return output_path
def decompress_file(
input_path: Path,
output_path: Optional[Path] = None,
chunk_size: int = 8192
) -> Path:
"""
Decompress a gzip file.
Args:
input_path: Path to compressed file
output_path: Path for decompressed file (default: remove .gz)
chunk_size: Size of chunks to read
Returns:
Path to decompressed file
"""
if output_path is None:
if input_path.suffix == ".gz":
output_path = input_path.with_suffix("")
else:
output_path = input_path.with_suffix(".decompressed")
with gzip.open(input_path, 'rb') as f_in:
with open(output_path, 'wb') as f_out:
while chunk := f_in.read(chunk_size):
f_out.write(chunk)
logger.debug(f"Decompressed: {input_path} -> {output_path}")
return output_path
def get_compression_ratio(original_path: Path, compressed_path: Path) -> float:
"""
Calculate compression ratio.
Args:
original_path: Path to original file
compressed_path: Path to compressed file
Returns:
Compression ratio (smaller is better)
"""
original_size = original_path.stat().st_size
compressed_size = compressed_path.stat().st_size
if original_size == 0:
return 1.0
return compressed_size / original_size