PYTHONPython

exporters

real world projects / web scraper / scraper

PYTHON
exporters.py🐍
"""
Data export functionality.
"""

import json
import csv
from pathlib import Path
from datetime import datetime


def export_data(
    data: list[dict],
    format: str = 'json',
    filename: str = 'output'
) -> str:
    """
    Export scraped data to file.
    
    Args:
        data: List of dictionaries to export
        format: Output format ('json' or 'csv')
        filename: Output filename (without extension)
        
    Returns:
        Path to created file
    """
    # Ensure extension
    if not filename.endswith(f'.{format}'):
        filename = f"{filename}.{format}"
    
    path = Path(filename)
    
    if format == 'json':
        return export_json(data, path)
    elif format == 'csv':
        return export_csv(data, path)
    else:
        raise ValueError(f"Unknown format: {format}")


def export_json(data: list[dict], path: Path) -> str:
    """Export data to JSON file."""
    output = {
        'scraped_at': datetime.now().isoformat(),
        'count': len(data),
        'results': data
    }
    
    with open(path, 'w') as f:
        json.dump(output, f, indent=2, default=str)
    
    return str(path)


def export_csv(data: list[dict], path: Path) -> str:
    """
    Export data to CSV file.
    
    Flattens nested data for CSV format.
    """
    if not data:
        return str(path)
    
    # Flatten data
    flat_rows = []
    for item in data:
        flat_row = {'name': item.get('name'), 'url': item.get('url')}
        
        # Flatten data dict
        for key, value in item.get('data', {}).items():
            if isinstance(value, dict):
                flat_row[key] = value.get('text', str(value))
            elif isinstance(value, list):
                flat_row[key] = '; '.join(
                    v.get('text', str(v)) if isinstance(v, dict) else str(v)
                    for v in value
                )
            else:
                flat_row[key] = value
        
        flat_rows.append(flat_row)
    
    # Get all fieldnames
    fieldnames = set()
    for row in flat_rows:
        fieldnames.update(row.keys())
    
    # Write CSV
    with open(path, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=sorted(fieldnames))
        writer.writeheader()
        writer.writerows(flat_rows)
    
    return str(path)
PreviousNext