API Scrapers¶
Scrap-E provides specialized scrapers for consuming APIs and web services. These scrapers handle authentication, rate limiting, and data transformation for various API types.
Development Status
API scrapers are currently in development. This documentation describes the planned interface and features.
Overview¶
API scrapers in Scrap-E support:
- REST APIs - HTTP-based RESTful services
- GraphQL APIs - Query-based GraphQL endpoints
- WebSocket APIs - Real-time WebSocket connections
- Authentication - OAuth 1.0/2.0, API keys, JWT tokens
- Rate Limiting - Built-in request throttling
- Pagination - Automatic page traversal
- Response Validation - Schema validation for API responses
Planned Architecture¶
REST API Scraper¶
from scrap_e.scrapers.api.rest_scraper import RestScraper
from scrap_e.core.config import APIScraperConfig
from scrap_e.core.models import AuthType
# Configuration for REST API
config = APIScraperConfig(
api_base_url="https://api.example.com",
api_auth_type=AuthType.BEARER,
api_key="your-api-token",
default_timeout=30.0,
rate_limit_calls=100,
rate_limit_period=60
)
# Create REST scraper
scraper = RestScraper(config)
# Simple GET request
result = await scraper.scrape("/users", method="GET")
# POST request with data
result = await scraper.scrape(
"/users",
method="POST",
json_data={"name": "John Doe", "email": "john@example.com"}
)
# Query parameters
result = await scraper.scrape(
"/users",
params={"page": 1, "limit": 50}
)
GraphQL API Scraper¶
from scrap_e.scrapers.api.graphql_scraper import GraphQLScraper
# GraphQL configuration
config = APIScraperConfig(
graphql_endpoint="https://api.example.com/graphql",
api_auth_type=AuthType.BEARER,
api_key="your-token"
)
scraper = GraphQLScraper(config)
# GraphQL query
query = """
query GetUsers($first: Int!) {
users(first: $first) {
edges {
node {
id
name
email
}
}
}
}
"""
result = await scraper.scrape(
query=query,
variables={"first": 10}
)
# GraphQL mutation
mutation = """
mutation CreateUser($input: UserInput!) {
createUser(input: $input) {
id
name
email
}
}
"""
result = await scraper.scrape(
query=mutation,
variables={"input": {"name": "Jane Doe", "email": "jane@example.com"}}
)
WebSocket API Scraper¶
from scrap_e.scrapers.api.websocket_scraper import WebSocketScraper
# WebSocket configuration
config = APIScraperConfig(
ws_endpoint="wss://api.example.com/ws",
ws_ping_interval=30.0,
ws_pong_timeout=10.0
)
scraper = WebSocketScraper(config)
# Connect and listen for messages
async with scraper.session() as ws:
# Send subscription message
await ws.send_json({
"type": "subscribe",
"channel": "updates"
})
# Listen for messages
async for message in ws.listen():
print(f"Received: {message}")
# Process message
if message.get("type") == "update":
await process_update(message["data"])
Authentication Methods¶
API Key Authentication¶
# Header-based API key
config = APIScraperConfig(
api_auth_type=AuthType.API_KEY,
api_key="your-api-key",
auth_header="X-API-Key" # Custom header name
)
# Query parameter API key
config = APIScraperConfig(
api_auth_type=AuthType.API_KEY,
api_key="your-api-key",
auth_query_param="api_key"
)
Bearer Token Authentication¶
config = APIScraperConfig(
api_auth_type=AuthType.BEARER,
api_key="your-bearer-token"
)
# Results in: Authorization: Bearer your-bearer-token
OAuth 2.0 Authentication¶
# OAuth 2.0 client credentials flow
config = APIScraperConfig(
api_auth_type=AuthType.OAUTH2,
oauth_client_id="your-client-id",
oauth_client_secret="your-client-secret",
oauth_token_url="https://api.example.com/oauth/token",
oauth_scope=["read", "write"]
)
scraper = RestScraper(config)
# Token is automatically obtained and refreshed
result = await scraper.scrape("/protected-resource")
JWT Token Authentication¶
config = APIScraperConfig(
api_auth_type=AuthType.JWT,
jwt_secret="your-jwt-secret",
jwt_algorithm="HS256",
jwt_payload={"user_id": 123, "role": "admin"}
)
Rate Limiting¶
Built-in Rate Limiting¶
config = APIScraperConfig(
rate_limit_calls=100, # 100 requests
rate_limit_period=60, # Per 60 seconds
rate_limit_burst=10 # Allow bursts of 10
)
scraper = RestScraper(config)
# Rate limiting is automatically applied
urls = [f"/users/{i}" for i in range(200)]
results = await scraper.scrape_multiple(urls) # Respects rate limits
Adaptive Rate Limiting¶
# Responds to HTTP 429 (Too Many Requests)
config = APIScraperConfig(
adaptive_rate_limit=True,
rate_limit_backoff_factor=2.0, # Exponential backoff
rate_limit_max_delay=300 # Max 5 minute delay
)
scraper = RestScraper(config)
# Automatically slows down when rate limited
result = await scraper.scrape("/rate-limited-endpoint")
Pagination Support¶
Automatic Pagination¶
from scrap_e.core.models import PaginationConfig
# Offset-based pagination
pagination_config = PaginationConfig(
enabled=True,
pagination_type="offset",
page_param="offset",
page_size_param="limit",
page_size=50,
max_pages=100
)
config = APIScraperConfig(pagination=pagination_config)
scraper = RestScraper(config)
# Scrape all pages automatically
all_data = await scraper.scrape_all_pages("/users")
Cursor-based Pagination¶
pagination_config = PaginationConfig(
enabled=True,
pagination_type="cursor",
cursor_param="cursor",
page_size_param="limit",
page_size=25
)
# Automatically follows cursor pagination
all_data = await scraper.scrape_all_pages("/timeline")
Link Header Pagination¶
pagination_config = PaginationConfig(
enabled=True,
pagination_type="link_header",
max_pages=50
)
# Follows RFC 5988 Link headers (GitHub style)
all_data = await scraper.scrape_all_pages("/repositories")
Response Processing¶
Automatic JSON Parsing¶
config = APIScraperConfig(
parse_json_automatically=True,
validate_response_schema=True
)
scraper = RestScraper(config)
# Automatic JSON parsing and validation
result = await scraper.scrape("/api/data")
data = result.data.json_data # Parsed JSON object
Schema Validation¶
from pydantic import BaseModel
from typing import List
class UserModel(BaseModel):
id: int
name: str
email: str
class UsersResponse(BaseModel):
users: List[UserModel]
total: int
# Configure response validation
config = APIScraperConfig(
response_model=UsersResponse,
validate_response_schema=True
)
scraper = RestScraper(config)
result = await scraper.scrape("/users")
validated_data = result.data.validated_response # UsersResponse instance
Custom Response Processing¶
def process_api_response(response_data: dict) -> dict:
"""Custom response processor."""
# Extract nested data
if "data" in response_data:
response_data = response_data["data"]
# Normalize field names
for item in response_data.get("items", []):
if "created_at" in item:
item["created_date"] = item.pop("created_at")
return response_data
config = APIScraperConfig(
response_processor=process_api_response
)
Error Handling¶
API-Specific Errors¶
from scrap_e.core.exceptions import (
APIError,
AuthenticationError,
RateLimitError,
ValidationError
)
try:
result = await scraper.scrape("/protected-endpoint")
except AuthenticationError as e:
print(f"Auth failed: {e}")
# Refresh token or re-authenticate
except RateLimitError as e:
print(f"Rate limited: {e}")
print(f"Retry after: {e.retry_after} seconds")
except APIError as e:
print(f"API error: {e}")
print(f"Status code: {e.status_code}")
print(f"Error response: {e.response_data}")
Retry Logic for APIs¶
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((RateLimitError, APIError))
)
async def resilient_api_call(endpoint: str):
return await scraper.scrape(endpoint)
# Usage
result = await resilient_api_call("/flaky-endpoint")
Streaming and Real-time Data¶
Server-Sent Events (SSE)¶
from scrap_e.scrapers.api.sse_scraper import SSEScraper
config = APIScraperConfig(
sse_endpoint="https://api.example.com/events",
api_auth_type=AuthType.BEARER,
api_key="your-token"
)
scraper = SSEScraper(config)
# Stream events
async for event in scraper.stream("/live-updates"):
print(f"Event: {event.event}")
print(f"Data: {event.data}")
if event.event == "user_update":
await process_user_update(event.data)
WebSocket Streaming¶
async def stream_websocket_data():
scraper = WebSocketScraper(config)
async with scraper.session() as ws:
# Send subscription
await ws.send_json({
"action": "subscribe",
"channels": ["trades", "orders"]
})
# Process streaming data
async for message in ws.listen():
if message["channel"] == "trades":
await process_trade(message["data"])
elif message["channel"] == "orders":
await process_order(message["data"])
Configuration Examples¶
Production API Configuration¶
# api_config.yaml
api:
base_url: "https://api.production.com/v1"
auth_type: oauth2
client_id: "${OAUTH_CLIENT_ID}"
client_secret: "${OAUTH_CLIENT_SECRET}"
token_url: "https://auth.production.com/oauth/token"
rate_limit:
calls: 1000
period: 3600 # 1 hour
burst: 50
adaptive: true
retry:
max_attempts: 5
initial_delay: 1.0
max_delay: 60.0
pagination:
enabled: true
type: "offset"
page_size: 100
max_pages: 500
cache:
enabled: true
ttl: 300 # 5 minutes
Development API Configuration¶
# dev_config.yaml
api:
base_url: "https://api.dev.com/v1"
auth_type: api_key
api_key: "${DEV_API_KEY}"
rate_limit:
calls: 100
period: 60
debug: true
log_requests: true
log_responses: true
Testing API Scrapers¶
Unit Tests¶
import pytest
from unittest.mock import AsyncMock, patch
from scrap_e.scrapers.api.rest_scraper import RestScraper
@pytest.mark.asyncio
async def test_rest_scraper_get():
config = APIScraperConfig(api_base_url="https://api.example.com")
scraper = RestScraper(config)
with patch('httpx.AsyncClient.get') as mock_get:
mock_response = AsyncMock()
mock_response.json.return_value = {"users": [{"id": 1, "name": "John"}]}
mock_response.status_code = 200
mock_get.return_value = mock_response
result = await scraper.scrape("/users")
assert result.success
assert result.data.json_data["users"][0]["name"] == "John"
@pytest.mark.asyncio
async def test_rest_scraper_authentication():
config = APIScraperConfig(
api_base_url="https://api.example.com",
api_auth_type=AuthType.BEARER,
api_key="test-token"
)
scraper = RestScraper(config)
with patch('httpx.AsyncClient.get') as mock_get:
await scraper.scrape("/protected")
# Verify Authorization header was set
call_kwargs = mock_get.call_args[1]
headers = call_kwargs.get("headers", {})
assert headers.get("Authorization") == "Bearer test-token"
Integration Tests¶
@pytest.mark.integration
@pytest.mark.asyncio
async def test_real_api_integration():
"""Test against real API (with test credentials)."""
config = APIScraperConfig(
api_base_url="https://jsonplaceholder.typicode.com",
default_timeout=10.0
)
scraper = RestScraper(config)
# Test GET
result = await scraper.scrape("/users")
assert result.success
assert len(result.data.json_data) > 0
# Test pagination
users_page_1 = await scraper.scrape("/users?_page=1&_limit=5")
users_page_2 = await scraper.scrape("/users?_page=2&_limit=5")
assert len(users_page_1.data.json_data) == 5
assert len(users_page_2.data.json_data) == 5
assert users_page_1.data.json_data[0]["id"] != users_page_2.data.json_data[0]["id"]
Performance Optimization¶
Connection Pooling¶
config = APIScraperConfig(
connection_pool_size=20,
connection_pool_max_size=100,
connection_keep_alive=30.0
)
# Reuse connections across multiple requests
scraper = RestScraper(config)
Caching¶
from scrap_e.core.models import CacheConfig
cache_config = CacheConfig(
enabled=True,
backend="redis", # or "memory", "disk"
ttl_seconds=300,
cache_key_prefix="api_cache"
)
config = APIScraperConfig(cache=cache_config)
scraper = RestScraper(config)
# Responses are automatically cached
result1 = await scraper.scrape("/slow-endpoint") # Cache miss
result2 = await scraper.scrape("/slow-endpoint") # Cache hit
Batch Operations¶
# Batch multiple API calls
endpoints = [f"/users/{i}" for i in range(1, 101)]
# Process in batches to respect rate limits
batch_size = 10
results = []
for i in range(0, len(endpoints), batch_size):
batch = endpoints[i:i + batch_size]
batch_results = await scraper.scrape_multiple(batch)
results.extend(batch_results)
# Optional delay between batches
if i + batch_size < len(endpoints):
await asyncio.sleep(1.0)
This comprehensive API scraper documentation covers the planned features and architecture for handling various types of APIs and web services within the Scrap-E framework.