enrich data

This commit is contained in:
Tour
2025-12-09 01:39:36 +01:00
parent 83d0fc1329
commit 06f63732b1
2 changed files with 181 additions and 22 deletions

View File

@@ -6,6 +6,7 @@ Cache Manager module for SQLite-based caching and data storage
import sqlite3 import sqlite3
import time import time
import zlib import zlib
import json
from datetime import datetime from datetime import datetime
from typing import Dict, List, Optional from typing import Dict, List, Optional
@@ -21,7 +22,7 @@ class CacheManager:
def _init_db(self): def _init_db(self):
"""Initialize cache and data storage database with consolidated schema""" """Initialize cache and data storage database with consolidated schema"""
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
# Cache table # HTML page cache table (existing)
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS cache ( CREATE TABLE IF NOT EXISTS cache (
url TEXT PRIMARY KEY, url TEXT PRIMARY KEY,
@@ -34,6 +35,26 @@ class CacheManager:
CREATE INDEX IF NOT EXISTS idx_timestamp ON cache(timestamp) CREATE INDEX IF NOT EXISTS idx_timestamp ON cache(timestamp)
""") """)
# Resource cache table (NEW: for ALL web resources - JS, CSS, images, fonts, etc.)
conn.execute("""
CREATE TABLE IF NOT EXISTS resource_cache (
url TEXT PRIMARY KEY,
content BLOB,
content_type TEXT,
status_code INTEGER,
headers TEXT,
timestamp REAL,
size_bytes INTEGER,
local_path TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_resource_timestamp ON resource_cache(timestamp)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_resource_content_type ON resource_cache(content_type)
""")
# Auctions table - consolidated schema # Auctions table - consolidated schema
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS auctions ( CREATE TABLE IF NOT EXISTS auctions (
@@ -170,8 +191,26 @@ class CacheManager:
ON bid_history(bidder_id) ON bid_history(bidder_id)
""") """)
# MIGRATIONS: Add new columns to existing tables
self._run_migrations(conn)
conn.commit() conn.commit()
def _run_migrations(self, conn):
"""Run database migrations to add new columns to existing tables"""
print("Checking for database migrations...")
# Check and add api_data_json column to lots table
cursor = conn.execute("PRAGMA table_info(lots)")
lots_columns = {row[1] for row in cursor.fetchall()}
if 'api_data_json' not in lots_columns:
print(" > Adding api_data_json column to lots table...")
conn.execute("ALTER TABLE lots ADD COLUMN api_data_json TEXT")
print(" * Migration complete")
else:
print(" * Database schema is up to date")
def get(self, url: str, max_age_hours: int = 24) -> Optional[Dict]: def get(self, url: str, max_age_hours: int = 24) -> Optional[Dict]:
"""Get cached page if it exists and is not too old""" """Get cached page if it exists and is not too old"""
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
@@ -346,3 +385,75 @@ class CacheManager:
VALUES (?, ?, 0) VALUES (?, ?, 0)
""", (lot_id, url)) """, (lot_id, url))
conn.commit() conn.commit()
def save_resource(self, url: str, content: bytes, content_type: str, status_code: int = 200,
headers: Optional[Dict] = None, local_path: Optional[str] = None):
"""Save a web resource (JS, CSS, image, font, etc.) to cache"""
with sqlite3.connect(self.db_path) as conn:
headers_json = json.dumps(headers) if headers else None
size_bytes = len(content) if content else 0
conn.execute("""
INSERT OR REPLACE INTO resource_cache
(url, content, content_type, status_code, headers, timestamp, size_bytes, local_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (url, content, content_type, status_code, headers_json, time.time(), size_bytes, local_path))
conn.commit()
def get_resource(self, url: str) -> Optional[Dict]:
"""Get a cached resource"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT content, content_type, status_code, headers, timestamp, size_bytes, local_path
FROM resource_cache WHERE url = ?
""", (url,))
row = cursor.fetchone()
if row:
return {
'content': row[0],
'content_type': row[1],
'status_code': row[2],
'headers': json.loads(row[3]) if row[3] else {},
'timestamp': row[4],
'size_bytes': row[5],
'local_path': row[6],
'cached': True
}
return None
def save_resource(self, url: str, content: bytes, content_type: str, status_code: int = 200,
headers: Optional[Dict] = None, local_path: Optional[str] = None):
"""Save a web resource (JS, CSS, image, font, etc.) to cache"""
with sqlite3.connect(self.db_path) as conn:
headers_json = json.dumps(headers) if headers else None
size_bytes = len(content) if content else 0
conn.execute("""
INSERT OR REPLACE INTO resource_cache
(url, content, content_type, status_code, headers, timestamp, size_bytes, local_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (url, content, content_type, status_code, headers_json, time.time(), size_bytes, local_path))
conn.commit()
def get_resource(self, url: str) -> Optional[Dict]:
"""Get a cached resource"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT content, content_type, status_code, headers, timestamp, size_bytes, local_path
FROM resource_cache WHERE url = ?
""", (url,))
row = cursor.fetchone()
if row:
return {
'content': row[0],
'content_type': row[1],
'status_code': row[2],
'headers': json.loads(row[3]) if row[3] else {},
'timestamp': row[4],
'size_bytes': row[5],
'local_path': row[6],
'cached': True
}
return None

View File

@@ -631,29 +631,67 @@ class TroostwijkScraper:
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
}) })
# Set up GraphQL API interception # Set up COMPREHENSIVE resource interception (cache EVERYTHING)
resource_stats = {'cached': 0, 'fetched': 0, 'failed': 0}
async def handle_response(response): async def handle_response(response):
"""Intercept GraphQL API responses""" """Intercept ALL resources and cache them"""
if 'graphql' in response.url and response.status == 200: try:
try: url = response.url
body = await response.body() status = response.status
body_text = body.decode('utf-8')
# Try to extract lot_id from the request to key our cache # Get content type
# The URL pattern is typically: .../storefront/graphql headers = await response.all_headers()
# We'll store by lot_id which we extract from the response data content_type = headers.get('content-type', '').split(';')[0].strip()
data = json.loads(body_text)
# Check if this is a lot details query # Determine if we should cache this resource
if 'data' in data and 'lot' in data.get('data', {}): cacheable_types = [
lot_data = data['data']['lot'] 'text/html', 'text/css', 'text/javascript', 'application/javascript',
lot_slug = lot_data.get('urlSlug', '') 'application/json', 'application/x-javascript', 'image/', 'font/',
if lot_slug: 'application/font', 'video/', 'audio/', 'application/xml', 'text/xml',
self.intercepted_api_data[lot_slug] = body_text 'image/svg+xml'
print(f" >> Intercepted API data for: {lot_slug}") ]
except Exception as e:
# Silent fail - interception is opportunistic should_cache = any(content_type.startswith(ct) for ct in cacheable_types)
pass
if should_cache and status == 200:
try:
body = await response.body()
# Save to resource cache
self.cache.save_resource(
url=url,
content=body,
content_type=content_type,
status_code=status,
headers=headers
)
resource_stats['cached'] += 1
# Special handling for GraphQL responses
if 'graphql' in url and 'application/json' in content_type:
try:
body_text = body.decode('utf-8')
data = json.loads(body_text)
# Check if this is a lot details query
if 'data' in data and 'lot' in data.get('data', {}):
lot_data = data['data']['lot']
lot_slug = lot_data.get('urlSlug', '')
if lot_slug:
self.intercepted_api_data[lot_slug] = body_text
print(f" >> Intercepted GraphQL for: {lot_slug}")
except:
pass
except Exception as e:
resource_stats['failed'] += 1
else:
resource_stats['fetched'] += 1
except Exception as e:
# Silent fail - interception is opportunistic
pass
page.on('response', handle_response) page.on('response', handle_response)
@@ -721,6 +759,16 @@ class TroostwijkScraper:
results.append(page_data) results.append(page_data)
await browser.close() await browser.close()
# Print resource caching statistics
print(f"\n{'='*60}")
print(f"RESOURCE CACHE STATISTICS")
print(f"{'='*60}")
print(f" Cached: {resource_stats['cached']} resources")
print(f" Fetched (not cached): {resource_stats['fetched']}")
print(f" Failed: {resource_stats['failed']}")
print(f"{'='*60}")
return results return results
def export_to_files(self) -> Dict[str, str]: def export_to_files(self) -> Dict[str, str]: