PYTHON
crawler.py🐍python
"""
Async web crawler implementation.
"""
import asyncio
import hashlib
from pathlib import Path
from typing import Optional
import aiohttp
from aiohttp import ClientTimeout
from .config import Settings, Target
from .parser import parse_html
class Crawler:
"""
Async web crawler with rate limiting and caching.
Usage:
async with Crawler(settings) as crawler:
result = await crawler.scrape(target)
"""
def __init__(self, settings: Settings):
self.settings = settings
self.session: Optional[aiohttp.ClientSession] = None
self.last_request_time = 0.0
# Setup cache directory
if settings.cache_enabled:
self.cache_dir = Path(settings.cache_dir)
self.cache_dir.mkdir(exist_ok=True)
async def __aenter__(self):
"""Create HTTP session on context enter."""
timeout = ClientTimeout(total=self.settings.timeout)
headers = {
'User-Agent': self.settings.user_agent,
'Accept': 'text/html,application/xhtml+xml',
}
self.session = aiohttp.ClientSession(
timeout=timeout,
headers=headers
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Close HTTP session on context exit."""
if self.session:
await self.session.close()
async def _rate_limit(self):
"""Enforce rate limiting between requests."""
import time
elapsed = time.time() - self.last_request_time
if elapsed < self.settings.rate_limit:
await asyncio.sleep(self.settings.rate_limit - elapsed)
self.last_request_time = time.time()
def _get_cache_path(self, url: str) -> Path:
"""Get cache file path for URL."""
url_hash = hashlib.md5(url.encode()).hexdigest()
return self.cache_dir / f"{url_hash}.html"
async def _fetch_with_cache(self, url: str) -> str:
"""Fetch URL content, using cache if available."""
if self.settings.cache_enabled:
cache_path = self._get_cache_path(url)
if cache_path.exists():
return cache_path.read_text()
# Respect rate limit
await self._rate_limit()
# Fetch from network
html = await self._fetch(url)
# Cache response
if self.settings.cache_enabled and html:
cache_path = self._get_cache_path(url)
cache_path.write_text(html)
return html
async def _fetch(self, url: str) -> str:
"""
Fetch a URL with retries.
Args:
url: URL to fetch
Returns:
HTML content as string
Raises:
Exception if all retries fail
"""
last_error = None
for attempt in range(self.settings.max_retries):
try:
async with self.session.get(url) as response:
response.raise_for_status()
return await response.text()
except aiohttp.ClientError as e:
last_error = e
if attempt < self.settings.max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
raise last_error
async def scrape(self, target: Target) -> dict:
"""
Scrape a target and extract data.
Args:
target: Target configuration
Returns:
Dictionary with extracted data
"""
html = await self._fetch_with_cache(target.url)
# Parse and extract
data = parse_html(html, target.selectors)
return {
'name': target.name,
'url': target.url,
'data': data
}
async def scrape_multiple(self, targets: list[Target]) -> list[dict]:
"""
Scrape multiple targets.
Note: Rate limiting is still enforced between requests.
"""
results = []
for target in targets:
result = await self.scrape(target)
results.append(result)
return results