PYTHONPython

crawler

real world projects / web scraper / scraper

PYTHON
crawler.py🐍
"""
Async web crawler implementation.
"""

import asyncio
import hashlib
from pathlib import Path
from typing import Optional
import aiohttp
from aiohttp import ClientTimeout

from .config import Settings, Target
from .parser import parse_html


class Crawler:
    """
    Async web crawler with rate limiting and caching.
    
    Usage:
        async with Crawler(settings) as crawler:
            result = await crawler.scrape(target)
    """
    
    def __init__(self, settings: Settings):
        self.settings = settings
        self.session: Optional[aiohttp.ClientSession] = None
        self.last_request_time = 0.0
        
        # Setup cache directory
        if settings.cache_enabled:
            self.cache_dir = Path(settings.cache_dir)
            self.cache_dir.mkdir(exist_ok=True)
    
    async def __aenter__(self):
        """Create HTTP session on context enter."""
        timeout = ClientTimeout(total=self.settings.timeout)
        headers = {
            'User-Agent': self.settings.user_agent,
            'Accept': 'text/html,application/xhtml+xml',
        }
        self.session = aiohttp.ClientSession(
            timeout=timeout,
            headers=headers
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Close HTTP session on context exit."""
        if self.session:
            await self.session.close()
    
    async def _rate_limit(self):
        """Enforce rate limiting between requests."""
        import time
        elapsed = time.time() - self.last_request_time
        if elapsed < self.settings.rate_limit:
            await asyncio.sleep(self.settings.rate_limit - elapsed)
        self.last_request_time = time.time()
    
    def _get_cache_path(self, url: str) -> Path:
        """Get cache file path for URL."""
        url_hash = hashlib.md5(url.encode()).hexdigest()
        return self.cache_dir / f"{url_hash}.html"
    
    async def _fetch_with_cache(self, url: str) -> str:
        """Fetch URL content, using cache if available."""
        if self.settings.cache_enabled:
            cache_path = self._get_cache_path(url)
            if cache_path.exists():
                return cache_path.read_text()
        
        # Respect rate limit
        await self._rate_limit()
        
        # Fetch from network
        html = await self._fetch(url)
        
        # Cache response
        if self.settings.cache_enabled and html:
            cache_path = self._get_cache_path(url)
            cache_path.write_text(html)
        
        return html
    
    async def _fetch(self, url: str) -> str:
        """
        Fetch a URL with retries.
        
        Args:
            url: URL to fetch
            
        Returns:
            HTML content as string
            
        Raises:
            Exception if all retries fail
        """
        last_error = None
        
        for attempt in range(self.settings.max_retries):
            try:
                async with self.session.get(url) as response:
                    response.raise_for_status()
                    return await response.text()
            except aiohttp.ClientError as e:
                last_error = e
                if attempt < self.settings.max_retries - 1:
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
        
        raise last_error
    
    async def scrape(self, target: Target) -> dict:
        """
        Scrape a target and extract data.
        
        Args:
            target: Target configuration
            
        Returns:
            Dictionary with extracted data
        """
        html = await self._fetch_with_cache(target.url)
        
        # Parse and extract
        data = parse_html(html, target.selectors)
        
        return {
            'name': target.name,
            'url': target.url,
            'data': data
        }
    
    async def scrape_multiple(self, targets: list[Target]) -> list[dict]:
        """
        Scrape multiple targets.
        
        Note: Rate limiting is still enforced between requests.
        """
        results = []
        for target in targets:
            result = await self.scrape(target)
            results.append(result)
        return results
PreviousNext