#!/usr/bin/env python3 """ Cache Manager module for database-backed caching and data storage. Backend: PostgreSQL (psycopg) """ import psycopg import time import threading from contextlib import contextmanager from typing import Dict, List, Optional, Tuple import zlib import json from datetime import datetime import config class _ConnectionPool: """Very small, thread-safe connection pool for psycopg (sync) connections. Avoids creating a new TCP connection for every DB access, which on Windows can quickly exhaust ephemeral ports and cause WSAEADDRINUSE (10048). """ def __init__(self, dsn: str, min_size: int = 1, max_size: int = 6, connect_fn=None, timeout: int = 30): self._dsn = dsn self._min = max(0, int(min_size)) self._max = max(1, int(max_size)) self._timeout = max(1, int(timeout)) self._connect = connect_fn or psycopg.connect self._lock = threading.Lock() self._cond = threading.Condition(self._lock) self._idle: list = [] self._created = 0 # Pre-warm pool for _ in range(self._min): conn = self._new_connection_with_retry() self._idle.append(conn) self._created += 1 def _new_connection_with_retry(self): last_exc = None backoffs = [0.05, 0.1, 0.2, 0.4, 0.8] for delay in backoffs: try: return self._connect(self._dsn) except Exception as e: last_exc = e time.sleep(delay) # Final attempt without sleeping after loop try: return self._connect(self._dsn) except Exception as e: last_exc = e raise last_exc def acquire(self, timeout: Optional[float] = None): deadline = time.time() + (timeout if timeout is not None else self._timeout) with self._cond: while True: # Reuse idle while self._idle: conn = self._idle.pop() try: if getattr(conn, "closed", False): self._created -= 1 continue return conn except Exception: # Consider it broken self._created -= 1 continue # Create new if capacity if self._created < self._max: conn = self._new_connection_with_retry() self._created += 1 return conn # Wait for release remaining = deadline - time.time() if remaining <= 0: raise TimeoutError("Timed out waiting for database connection from pool") self._cond.wait(remaining) def release(self, conn): try: try: conn.rollback() except Exception: pass if getattr(conn, "closed", False): with self._cond: self._created -= 1 self._cond.notify() return with self._cond: self._idle.append(conn) self._cond.notify() except Exception: # Drop silently on unexpected errors with self._cond: try: self._created -= 1 except Exception: pass self._cond.notify() @contextmanager def connection(self, timeout: Optional[float] = None): conn = self.acquire(timeout) try: yield conn finally: self.release(conn) def closeall(self): with self._cond: for c in self._idle: try: c.close() except Exception: pass self._idle.clear() self._created = 0 class CacheManager: """Manages page caching and data storage using PostgreSQL.""" def __init__(self): self.database_url = (config.DATABASE_URL or '').strip() if not self.database_url.lower().startswith('postgresql'): raise RuntimeError("DATABASE_URL must be a PostgreSQL URL, e.g., postgresql://user:pass@host:5432/db") # Initialize a small connection pool to prevent excessive short-lived TCP connections self._pool = _ConnectionPool( self.database_url, min_size=getattr(config, 'DB_POOL_MIN', 1), max_size=getattr(config, 'DB_POOL_MAX', 6), timeout=getattr(config, 'DB_POOL_TIMEOUT', 30), ) self._init_db() # ------------------------ # Connection helpers # ------------------------ def _pg(self): # Return a context manager yielding a pooled connection return self._pool.connection() def _init_db(self): """Initialize database schema if missing. - Create tables with IF NOT EXISTS for PostgreSQL. """ with self._pg() as conn, conn.cursor() as cur: # Auctions cur.execute( """ CREATE TABLE IF NOT EXISTS auctions ( auction_id TEXT PRIMARY KEY, url TEXT UNIQUE, title TEXT, location TEXT, lots_count INTEGER, first_lot_closing_time TEXT, scraped_at TEXT, city TEXT, country TEXT, type TEXT, lot_count INTEGER DEFAULT 0, closing_time TEXT, discovered_at BIGINT ) """ ) cur.execute("CREATE INDEX IF NOT EXISTS idx_auctions_country ON auctions(country)") # Cache cur.execute( """ CREATE TABLE IF NOT EXISTS cache ( url TEXT PRIMARY KEY, content BYTEA, timestamp DOUBLE PRECISION, status_code INTEGER ) """ ) cur.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON cache(timestamp)") # Lots cur.execute( """ CREATE TABLE IF NOT EXISTS lots ( lot_id TEXT PRIMARY KEY, auction_id TEXT REFERENCES auctions(auction_id), url TEXT UNIQUE, title TEXT, current_bid TEXT, bid_count INTEGER, closing_time TEXT, viewing_time TEXT, pickup_date TEXT, location TEXT, description TEXT, category TEXT, scraped_at TEXT, sale_id INTEGER, manufacturer TEXT, type TEXT, year INTEGER, currency TEXT DEFAULT 'EUR', closing_notified INTEGER DEFAULT 0, starting_bid TEXT, minimum_bid TEXT, status TEXT, brand TEXT, model TEXT, attributes_json TEXT, first_bid_time TEXT, last_bid_time TEXT, bid_velocity DOUBLE PRECISION, bid_increment DOUBLE PRECISION, year_manufactured INTEGER, condition_score DOUBLE PRECISION, condition_description TEXT, serial_number TEXT, damage_description TEXT, followers_count INTEGER DEFAULT 0, estimated_min_price DOUBLE PRECISION, estimated_max_price DOUBLE PRECISION, lot_condition TEXT, appearance TEXT, estimated_min DOUBLE PRECISION, estimated_max DOUBLE PRECISION, next_bid_step_cents INTEGER, condition TEXT, category_path TEXT, city_location TEXT, country_code TEXT, bidding_status TEXT, packaging TEXT, quantity INTEGER, vat DOUBLE PRECISION, buyer_premium_percentage DOUBLE PRECISION, remarks TEXT, reserve_price DOUBLE PRECISION, reserve_met INTEGER, view_count INTEGER, api_data_json TEXT, next_scrape_at BIGINT, scrape_priority INTEGER DEFAULT 0 ) """ ) cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_sale_id ON lots(sale_id)") cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_closing_time ON lots(closing_time)") cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_next_scrape ON lots(next_scrape_at)") cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_priority ON lots(scrape_priority DESC)") # Images cur.execute( """ CREATE TABLE IF NOT EXISTS images ( id SERIAL PRIMARY KEY, lot_id TEXT REFERENCES lots(lot_id), url TEXT, local_path TEXT, downloaded INTEGER DEFAULT 0, labels TEXT, processed_at BIGINT ) """ ) cur.execute("CREATE INDEX IF NOT EXISTS idx_images_lot_id ON images(lot_id)") cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_lot_url ON images(lot_id, url)") # Bid history cur.execute( """ CREATE TABLE IF NOT EXISTS bid_history ( id SERIAL PRIMARY KEY, lot_id TEXT REFERENCES lots(lot_id), bid_amount DOUBLE PRECISION NOT NULL, bid_time TEXT NOT NULL, is_autobid INTEGER DEFAULT 0, bidder_id TEXT, bidder_number INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ) cur.execute("CREATE INDEX IF NOT EXISTS idx_bid_history_bidder ON bid_history(bidder_id)") cur.execute("CREATE INDEX IF NOT EXISTS idx_bid_history_lot_time ON bid_history(lot_id, bid_time)") # Resource cache cur.execute( """ CREATE TABLE IF NOT EXISTS resource_cache ( url TEXT PRIMARY KEY, content BYTEA, content_type TEXT, status_code INTEGER, headers TEXT, timestamp DOUBLE PRECISION, size_bytes INTEGER, local_path TEXT ) """ ) cur.execute("CREATE INDEX IF NOT EXISTS idx_resource_timestamp ON resource_cache(timestamp)") cur.execute("CREATE INDEX IF NOT EXISTS idx_resource_content_type ON resource_cache(content_type)") conn.commit() return # SQLite migrations removed; PostgreSQL uses IF NOT EXISTS DDL above def get(self, url: str, max_age_hours: int = 24) -> Optional[Dict]: """Get cached page if it exists and is not too old""" with self._pg() as conn, conn.cursor() as cur: cur.execute("SELECT content, timestamp, status_code FROM cache WHERE url = %s", (url,)) row = cur.fetchone() if row: content, timestamp, status_code = row age_hours = (time.time() - timestamp) / 3600 if age_hours <= max_age_hours: try: content = zlib.decompress(content).decode('utf-8') except Exception as e: print(f" ⚠️ Failed to decompress cache for {url}: {e}") return None return { 'content': content, 'timestamp': timestamp, 'status_code': status_code, 'cached': True } return None def set(self, url: str, content: str, status_code: int = 200): """Cache a page with compression""" compressed_content = zlib.compress(content.encode('utf-8'), level=9) original_size = len(content.encode('utf-8')) compressed_size = len(compressed_content) ratio = (1 - compressed_size / original_size) * 100 if original_size > 0 else 0 with self._pg() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO cache (url, content, timestamp, status_code) VALUES (%s, %s, %s, %s) ON CONFLICT (url) DO UPDATE SET content = EXCLUDED.content, timestamp = EXCLUDED.timestamp, status_code = EXCLUDED.status_code """, (url, compressed_content, time.time(), status_code), ) conn.commit() print(f" -> Cached: {url} (compressed {ratio:.1f}%)") def clear_old(self, max_age_hours: int = 168): """Clear old cache entries to prevent database bloat""" cutoff_time = time.time() - (max_age_hours * 3600) with self._pg() as conn, conn.cursor() as cur: cur.execute("DELETE FROM cache WHERE timestamp < %s", (cutoff_time,)) deleted = cur.rowcount or 0 conn.commit() if (deleted or 0) > 0: print(f" → Cleared {deleted} old cache entries") def save_auction(self, auction_data: Dict): """Save auction data to database""" # Parse location into city and country location = auction_data.get('location', '') city = None country = None if location: parts = [p.strip() for p in location.split(',')] if len(parts) >= 2: city = parts[0] country = parts[-1] with self._pg() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO auctions (auction_id, url, title, location, lots_count, first_lot_closing_time, scraped_at, city, country, type, lot_count, closing_time, discovered_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (auction_id) DO UPDATE SET url = EXCLUDED.url, title = EXCLUDED.title, location = EXCLUDED.location, lots_count = EXCLUDED.lots_count, first_lot_closing_time = EXCLUDED.first_lot_closing_time, scraped_at = EXCLUDED.scraped_at, city = EXCLUDED.city, country = EXCLUDED.country, type = EXCLUDED.type, lot_count = EXCLUDED.lot_count, closing_time = EXCLUDED.closing_time, discovered_at = EXCLUDED.discovered_at """, ( auction_data['auction_id'], auction_data['url'], auction_data['title'], location, auction_data.get('lots_count', 0), auction_data.get('first_lot_closing_time', ''), auction_data['scraped_at'], city, country, 'online', auction_data.get('lots_count', 0), auction_data.get('first_lot_closing_time', ''), int(time.time()), ), ) conn.commit() def save_lot(self, lot_data: Dict): """Save lot data to database""" params = ( lot_data['lot_id'], lot_data.get('auction_id', ''), lot_data['url'], lot_data['title'], lot_data.get('current_bid', ''), lot_data.get('starting_bid', ''), lot_data.get('minimum_bid', ''), lot_data.get('bid_count', 0), lot_data.get('closing_time', ''), lot_data.get('viewing_time', ''), lot_data.get('pickup_date', ''), lot_data.get('location', ''), lot_data.get('description', ''), lot_data.get('category', ''), lot_data.get('status', ''), lot_data.get('brand', ''), lot_data.get('model', ''), lot_data.get('attributes_json', ''), lot_data.get('first_bid_time'), lot_data.get('last_bid_time'), lot_data.get('bid_velocity'), lot_data.get('bid_increment'), lot_data.get('year_manufactured'), lot_data.get('condition_score'), lot_data.get('condition_description', ''), lot_data.get('serial_number', ''), lot_data.get('manufacturer', ''), lot_data.get('damage_description', ''), lot_data.get('followers_count', 0), lot_data.get('estimated_min_price'), lot_data.get('estimated_max_price'), lot_data.get('lot_condition', ''), lot_data.get('appearance', ''), lot_data['scraped_at'], lot_data.get('api_data_json'), lot_data.get('next_scrape_at'), lot_data.get('scrape_priority', 0), ) with self._pg() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO lots (lot_id, auction_id, url, title, current_bid, starting_bid, minimum_bid, bid_count, closing_time, viewing_time, pickup_date, location, description, category, status, brand, model, attributes_json, first_bid_time, last_bid_time, bid_velocity, bid_increment, year_manufactured, condition_score, condition_description, serial_number, manufacturer, damage_description, followers_count, estimated_min_price, estimated_max_price, lot_condition, appearance, scraped_at, api_data_json, next_scrape_at, scrape_priority) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (lot_id) DO UPDATE SET auction_id = EXCLUDED.auction_id, url = EXCLUDED.url, title = EXCLUDED.title, current_bid = EXCLUDED.current_bid, starting_bid = EXCLUDED.starting_bid, minimum_bid = EXCLUDED.minimum_bid, bid_count = EXCLUDED.bid_count, closing_time = EXCLUDED.closing_time, viewing_time = EXCLUDED.viewing_time, pickup_date = EXCLUDED.pickup_date, location = EXCLUDED.location, description = EXCLUDED.description, category = EXCLUDED.category, status = EXCLUDED.status, brand = EXCLUDED.brand, model = EXCLUDED.model, attributes_json = EXCLUDED.attributes_json, first_bid_time = EXCLUDED.first_bid_time, last_bid_time = EXCLUDED.last_bid_time, bid_velocity = EXCLUDED.bid_velocity, bid_increment = EXCLUDED.bid_increment, year_manufactured = EXCLUDED.year_manufactured, condition_score = EXCLUDED.condition_score, condition_description = EXCLUDED.condition_description, serial_number = EXCLUDED.serial_number, manufacturer = EXCLUDED.manufacturer, damage_description = EXCLUDED.damage_description, followers_count = EXCLUDED.followers_count, estimated_min_price = EXCLUDED.estimated_min_price, estimated_max_price = EXCLUDED.estimated_max_price, lot_condition = EXCLUDED.lot_condition, appearance = EXCLUDED.appearance, scraped_at = EXCLUDED.scraped_at, api_data_json = EXCLUDED.api_data_json, next_scrape_at = EXCLUDED.next_scrape_at, scrape_priority = EXCLUDED.scrape_priority """, params, ) conn.commit() def save_bid_history(self, lot_id: str, bid_records: List[Dict]): """Save bid history records to database""" if not bid_records: return with self._pg() as conn, conn.cursor() as cur: cur.execute("DELETE FROM bid_history WHERE lot_id = %s", (lot_id,)) for record in bid_records: cur.execute( """ INSERT INTO bid_history (lot_id, bid_amount, bid_time, is_autobid, bidder_id, bidder_number) VALUES (%s, %s, %s, %s, %s, %s) """, ( record['lot_id'], record['bid_amount'], record['bid_time'], 1 if record['is_autobid'] else 0, record['bidder_id'], record['bidder_number'], ), ) conn.commit() def save_images(self, lot_id: str, image_urls: List[str]): """Save image URLs for a lot (prevents duplicates via unique constraint)""" with self._pg() as conn, conn.cursor() as cur: for url in image_urls: cur.execute( """ INSERT INTO images (lot_id, url, downloaded) VALUES (%s, %s, 0) ON CONFLICT (lot_id, url) DO NOTHING """, (lot_id, url), ) conn.commit() def update_image_local_path(self, lot_id: str, url: str, local_path: str): with self._pg() as conn, conn.cursor() as cur: cur.execute( "UPDATE images SET local_path = %s, downloaded = 1 WHERE lot_id = %s AND url = %s", (local_path, lot_id, url), ) conn.commit() def save_resource(self, url: str, content: bytes, content_type: str, status_code: int = 200, headers: Optional[Dict] = None, local_path: Optional[str] = None, cache_key: Optional[str] = None): """Save a web resource (JS, CSS, image, font, etc.) to cache Args: cache_key: Optional composite key (url + body hash for POST requests) """ headers_json = json.dumps(headers) if headers else None size_bytes = len(content) if content else 0 key = cache_key if cache_key else url with self._pg() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO resource_cache (url, content, content_type, status_code, headers, timestamp, size_bytes, local_path) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (url) DO UPDATE SET content = EXCLUDED.content, content_type = EXCLUDED.content_type, status_code = EXCLUDED.status_code, headers = EXCLUDED.headers, timestamp = EXCLUDED.timestamp, size_bytes = EXCLUDED.size_bytes, local_path = EXCLUDED.local_path """, (key, content, content_type, status_code, headers_json, time.time(), size_bytes, local_path), ) conn.commit() def get_resource(self, url: str, cache_key: Optional[str] = None) -> Optional[Dict]: """Get a cached resource Args: cache_key: Optional composite key to lookup """ key = cache_key if cache_key else url with self._pg() as conn, conn.cursor() as cur: cur.execute( "SELECT content, content_type, status_code, headers, timestamp, size_bytes, local_path FROM resource_cache WHERE url = %s", (key,), ) row = cur.fetchone() if row: return { 'content': row[0], 'content_type': row[1], 'status_code': row[2], 'headers': json.loads(row[3]) if row[3] else {}, 'timestamp': row[4], 'size_bytes': row[5], 'local_path': row[6], 'cached': True } return None # ------------------------ # Query helpers for scraper/monitor/export # ------------------------ def get_counts(self) -> Dict[str, int]: with self._pg() as conn, conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM auctions") auctions = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM lots") lots = cur.fetchone()[0] return {"auctions": auctions, "lots": lots} def get_lot_api_fields(self, lot_id: str) -> Optional[Tuple]: sql = ( "SELECT followers_count, estimated_min_price, current_bid, bid_count, closing_time, status " "FROM lots WHERE lot_id = %s" ) params = (lot_id,) with self._pg() as conn, conn.cursor() as cur: cur.execute(sql, params) return cur.fetchone() def get_page_record_by_url(self, url: str) -> Optional[Dict]: # Try lot record first by URL with self._pg() as conn, conn.cursor() as cur: cur.execute("SELECT * FROM lots WHERE url = %s", (url,)) lot_row = cur.fetchone() if lot_row: col_names = [desc.name for desc in cur.description] lot_dict = dict(zip(col_names, lot_row)) return {"type": "lot", **lot_dict} cur.execute("SELECT * FROM auctions WHERE url = %s", (url,)) auc_row = cur.fetchone() if auc_row: col_names = [desc.name for desc in cur.description] auc_dict = dict(zip(col_names, auc_row)) return {"type": "auction", **auc_dict} return None def fetch_all(self, table: str) -> List[Dict]: assert table in {"auctions", "lots"} with self._pg() as conn, conn.cursor() as cur: cur.execute(f"SELECT * FROM {table}") rows = cur.fetchall() col_names = [desc.name for desc in cur.description] return [dict(zip(col_names, r)) for r in rows] def get_lot_times(self, lot_id: str) -> Tuple[Optional[str], Optional[str]]: sql = ( "SELECT viewing_time, pickup_date FROM lots WHERE lot_id = %s" ) params = (lot_id,) with self._pg() as conn, conn.cursor() as cur: cur.execute(sql, params) row = cur.fetchone() if not row: return None, None return row[0], row[1] def has_bid_history(self, lot_id: str) -> bool: sql = ("SELECT COUNT(*) FROM bid_history WHERE lot_id = %s") params = (lot_id,) with self._pg() as conn, conn.cursor() as cur: cur.execute(sql, params) cnt = cur.fetchone()[0] return cnt > 0 def get_downloaded_image_urls(self, lot_id: str) -> List[str]: sql = ("SELECT url FROM images WHERE lot_id = %s AND downloaded = 1") params = (lot_id,) with self._pg() as conn, conn.cursor() as cur: cur.execute(sql, params) return [r[0] for r in cur.fetchall()] # ------------------------ # Aggregation helpers for scraper # ------------------------ def get_distinct_urls(self) -> Dict[str, List[str]]: with self._pg() as conn, conn.cursor() as cur: cur.execute("SELECT DISTINCT url FROM auctions") auctions = [r[0] for r in cur.fetchall() if r and r[0]] cur.execute("SELECT DISTINCT url FROM lots") lots = [r[0] for r in cur.fetchall() if r and r[0]] return {"auctions": auctions, "lots": lots} def get_lot_priority_info(self, lot_id: str, url: str) -> Tuple[Optional[str], Optional[str], Optional[int], Optional[int]]: with self._pg() as conn, conn.cursor() as cur: cur.execute( """ SELECT closing_time, scraped_at, scrape_priority, next_scrape_at FROM lots WHERE lot_id = %s OR url = %s """, (lot_id, url), ) row = cur.fetchone() if not row: return None, None, None, None return row[0], row[1], row[2], row[3] def get_recent_cached_urls(self, limit: int = 10) -> List[str]: with self._pg() as conn, conn.cursor() as cur: cur.execute("SELECT url FROM cache ORDER BY timestamp DESC LIMIT %s", (limit,)) return [r[0] for r in cur.fetchall()]