diff --git a/.aiignore b/.aiignore index 4a4c93d..af059f3 100644 --- a/.aiignore +++ b/.aiignore @@ -21,3 +21,5 @@ node_modules/ .git .github scripts +.pytest_cache/ +__pycache__ \ No newline at end of file diff --git a/README.md b/README.md index 87e827d..246fe35 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,29 @@ playwright install chromium --- +## Database Configuration (PostgreSQL) + +The scraper now uses PostgreSQL (no more SQLite files). Configure via `DATABASE_URL`: + +- Default (baked in): + `postgresql://auction:heel-goed-wachtwoord@192.168.1.159:5432/auctiondb` +- Override for your environment: + +```bash +# Windows PowerShell +$env:DATABASE_URL = "postgresql://user:pass@host:5432/dbname" + +# Linux/macOS +export DATABASE_URL="postgresql://user:pass@host:5432/dbname" +``` + +Packages used: +- Driver: `psycopg[binary]` + +Nothing is written to local `.db` files anymore. + +--- + ## Verify ```bash @@ -117,9 +140,9 @@ tasklist | findstr python # Troubleshooting -- Wrong interpreter → Set Python 3.10+ -- Multiple monitors running → kill extra processes -- SQLite locked → ensure one instance only +- Wrong interpreter → Set Python 3.10+ +- Multiple monitors running → kill extra processes +- PostgreSQL connectivity → verify `DATABASE_URL`, network/firewall, and credentials - Service fails → check `journalctl -u scaev-monitor` --- @@ -149,11 +172,6 @@ Enable native access (IntelliJ → VM Options): --- -## Cache - -- Path: `cache/page_cache.db` -- Clear: delete the file - --- This file keeps everything compact, Python‑focused, and ready for onboarding. diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index d1eba4a..f9b04a9 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -321,13 +321,13 @@ Lot Page Parsed ## Key Configuration -| Setting | Value | Purpose | -|----------------------|-----------------------------------|----------------------------------| -| `CACHE_DB` | `/mnt/okcomputer/output/cache.db` | SQLite database path | -| `IMAGES_DIR` | `/mnt/okcomputer/output/images` | Downloaded images storage | -| `RATE_LIMIT_SECONDS` | `0.5` | Delay between requests | -| `DOWNLOAD_IMAGES` | `False` | Toggle image downloading | -| `MAX_PAGES` | `50` | Number of listing pages to crawl | +| Setting | Value | Purpose | +|----------------------|--------------------------------------------------------------------------|----------------------------------| +| `DATABASE_URL` | `postgresql://auction:heel-goed-wachtwoord@192.168.1.159:5432/auctiondb` | PostgreSQL connection string | +| `IMAGES_DIR` | `/mnt/okcomputer/output/images` | Downloaded images storage | +| `RATE_LIMIT_SECONDS` | `0.5` | Delay between requests | +| `DOWNLOAD_IMAGES` | `False` | Toggle image downloading | +| `MAX_PAGES` | `50` | Number of listing pages to crawl | ## Output Files @@ -376,7 +376,7 @@ For each lot, the data “tunnels through” the following stages: 1. HTML page → parse `__NEXT_DATA__` for core lot fields and lot UUID. 2. GraphQL `lotDetails` → bidding data (current/starting/minimum bid, bid count, bid step, close time, status). 3. Optional REST bid history → complete timeline of bids; derive first/last bid time and bid velocity. -4. Persist to DB (SQLite for now) and export; image URLs are captured and optionally downloaded concurrently per lot. +4. Persist to DB (PostgreSQL) and export; image URLs are captured and optionally downloaded concurrently per lot. Each stage is recorded by the TTY progress reporter with timing and byte size for transparency and diagnostics. diff --git a/src/cache.py b/src/cache.py index d8533f9..64989f6 100644 --- a/src/cache.py +++ b/src/cache.py @@ -2,241 +2,164 @@ """ Cache Manager module for database-backed caching and data storage. -Primary backend: PostgreSQL (psycopg) -Fallback (dev/tests only): SQLite +Backend: PostgreSQL (psycopg) """ -import sqlite3 import psycopg import time +import threading +from contextlib import contextmanager +from typing import Dict, List, Optional, Tuple import zlib import json from datetime import datetime -from typing import Dict, List, Optional, Tuple import config -class CacheManager: - """Manages page caching and data storage using PostgreSQL (preferred) or SQLite.""" - def __init__(self, db_path: str = None): - # Decide backend +class _ConnectionPool: + """Very small, thread-safe connection pool for psycopg (sync) connections. + + Avoids creating a new TCP connection for every DB access, which on Windows + can quickly exhaust ephemeral ports and cause WSAEADDRINUSE (10048). + """ + + def __init__(self, dsn: str, min_size: int = 1, max_size: int = 6, connect_fn=None, timeout: int = 30): + self._dsn = dsn + self._min = max(0, int(min_size)) + self._max = max(1, int(max_size)) + self._timeout = max(1, int(timeout)) + self._connect = connect_fn or psycopg.connect + + self._lock = threading.Lock() + self._cond = threading.Condition(self._lock) + self._idle: list = [] + self._created = 0 + + # Pre-warm pool + for _ in range(self._min): + conn = self._new_connection_with_retry() + self._idle.append(conn) + self._created += 1 + + def _new_connection_with_retry(self): + last_exc = None + backoffs = [0.05, 0.1, 0.2, 0.4, 0.8] + for delay in backoffs: + try: + return self._connect(self._dsn) + except Exception as e: + last_exc = e + time.sleep(delay) + # Final attempt without sleeping after loop + try: + return self._connect(self._dsn) + except Exception as e: + last_exc = e + raise last_exc + + def acquire(self, timeout: Optional[float] = None): + deadline = time.time() + (timeout if timeout is not None else self._timeout) + with self._cond: + while True: + # Reuse idle + while self._idle: + conn = self._idle.pop() + try: + if getattr(conn, "closed", False): + self._created -= 1 + continue + return conn + except Exception: + # Consider it broken + self._created -= 1 + continue + + # Create new if capacity + if self._created < self._max: + conn = self._new_connection_with_retry() + self._created += 1 + return conn + + # Wait for release + remaining = deadline - time.time() + if remaining <= 0: + raise TimeoutError("Timed out waiting for database connection from pool") + self._cond.wait(remaining) + + def release(self, conn): + try: + try: + conn.rollback() + except Exception: + pass + if getattr(conn, "closed", False): + with self._cond: + self._created -= 1 + self._cond.notify() + return + with self._cond: + self._idle.append(conn) + self._cond.notify() + except Exception: + # Drop silently on unexpected errors + with self._cond: + try: + self._created -= 1 + except Exception: + pass + self._cond.notify() + + @contextmanager + def connection(self, timeout: Optional[float] = None): + conn = self.acquire(timeout) + try: + yield conn + finally: + self.release(conn) + + def closeall(self): + with self._cond: + for c in self._idle: + try: + c.close() + except Exception: + pass + self._idle.clear() + self._created = 0 + +class CacheManager: + """Manages page caching and data storage using PostgreSQL.""" + + def __init__(self): self.database_url = (config.DATABASE_URL or '').strip() - self.use_postgres = self.database_url.lower().startswith('postgresql') - # Legacy sqlite path retained only for fallback/testing - self.db_path = db_path or config.CACHE_DB + if not self.database_url.lower().startswith('postgresql'): + raise RuntimeError("DATABASE_URL must be a PostgreSQL URL, e.g., postgresql://user:pass@host:5432/db") + # Initialize a small connection pool to prevent excessive short-lived TCP connections + self._pool = _ConnectionPool( + self.database_url, + min_size=getattr(config, 'DB_POOL_MIN', 1), + max_size=getattr(config, 'DB_POOL_MAX', 6), + timeout=getattr(config, 'DB_POOL_TIMEOUT', 30), + ) self._init_db() # ------------------------ # Connection helpers # ------------------------ def _pg(self): - return psycopg.connect(self.database_url) + # Return a context manager yielding a pooled connection + return self._pool.connection() def _init_db(self): """Initialize database schema if missing. - - For PostgreSQL: create tables with IF NOT EXISTS. - - For SQLite: retain legacy schema and migrations. + - Create tables with IF NOT EXISTS for PostgreSQL. """ - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - # Auctions - cur.execute( - """ - CREATE TABLE IF NOT EXISTS auctions ( - auction_id TEXT PRIMARY KEY, - url TEXT UNIQUE, - title TEXT, - location TEXT, - lots_count INTEGER, - first_lot_closing_time TEXT, - scraped_at TEXT, - city TEXT, - country TEXT, - type TEXT, - lot_count INTEGER DEFAULT 0, - closing_time TEXT, - discovered_at BIGINT - ) - """ - ) - cur.execute("CREATE INDEX IF NOT EXISTS idx_auctions_country ON auctions(country)") - - # Cache - cur.execute( - """ - CREATE TABLE IF NOT EXISTS cache ( - url TEXT PRIMARY KEY, - content BYTEA, - timestamp DOUBLE PRECISION, - status_code INTEGER - ) - """ - ) - cur.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON cache(timestamp)") - - # Lots - cur.execute( - """ - CREATE TABLE IF NOT EXISTS lots ( - lot_id TEXT PRIMARY KEY, - auction_id TEXT REFERENCES auctions(auction_id), - url TEXT UNIQUE, - title TEXT, - current_bid TEXT, - bid_count INTEGER, - closing_time TEXT, - viewing_time TEXT, - pickup_date TEXT, - location TEXT, - description TEXT, - category TEXT, - scraped_at TEXT, - sale_id INTEGER, - manufacturer TEXT, - type TEXT, - year INTEGER, - currency TEXT DEFAULT 'EUR', - closing_notified INTEGER DEFAULT 0, - starting_bid TEXT, - minimum_bid TEXT, - status TEXT, - brand TEXT, - model TEXT, - attributes_json TEXT, - first_bid_time TEXT, - last_bid_time TEXT, - bid_velocity DOUBLE PRECISION, - bid_increment DOUBLE PRECISION, - year_manufactured INTEGER, - condition_score DOUBLE PRECISION, - condition_description TEXT, - serial_number TEXT, - damage_description TEXT, - followers_count INTEGER DEFAULT 0, - estimated_min_price DOUBLE PRECISION, - estimated_max_price DOUBLE PRECISION, - lot_condition TEXT, - appearance TEXT, - estimated_min DOUBLE PRECISION, - estimated_max DOUBLE PRECISION, - next_bid_step_cents INTEGER, - condition TEXT, - category_path TEXT, - city_location TEXT, - country_code TEXT, - bidding_status TEXT, - packaging TEXT, - quantity INTEGER, - vat DOUBLE PRECISION, - buyer_premium_percentage DOUBLE PRECISION, - remarks TEXT, - reserve_price DOUBLE PRECISION, - reserve_met INTEGER, - view_count INTEGER, - api_data_json TEXT, - next_scrape_at BIGINT, - scrape_priority INTEGER DEFAULT 0 - ) - """ - ) - cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_sale_id ON lots(sale_id)") - cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_closing_time ON lots(closing_time)") - cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_next_scrape ON lots(next_scrape_at)") - cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_priority ON lots(scrape_priority DESC)") - - # Images - cur.execute( - """ - CREATE TABLE IF NOT EXISTS images ( - id SERIAL PRIMARY KEY, - lot_id TEXT REFERENCES lots(lot_id), - url TEXT, - local_path TEXT, - downloaded INTEGER DEFAULT 0, - labels TEXT, - processed_at BIGINT - ) - """ - ) - cur.execute("CREATE INDEX IF NOT EXISTS idx_images_lot_id ON images(lot_id)") - cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_lot_url ON images(lot_id, url)") - - # Bid history - cur.execute( - """ - CREATE TABLE IF NOT EXISTS bid_history ( - id SERIAL PRIMARY KEY, - lot_id TEXT REFERENCES lots(lot_id), - bid_amount DOUBLE PRECISION NOT NULL, - bid_time TEXT NOT NULL, - is_autobid INTEGER DEFAULT 0, - bidder_id TEXT, - bidder_number INTEGER, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """ - ) - cur.execute("CREATE INDEX IF NOT EXISTS idx_bid_history_bidder ON bid_history(bidder_id)") - cur.execute("CREATE INDEX IF NOT EXISTS idx_bid_history_lot_time ON bid_history(lot_id, bid_time)") - # Resource cache - cur.execute( - """ - CREATE TABLE IF NOT EXISTS resource_cache ( - url TEXT PRIMARY KEY, - content BYTEA, - content_type TEXT, - status_code INTEGER, - headers TEXT, - timestamp DOUBLE PRECISION, - size_bytes INTEGER, - local_path TEXT - ) - """ - ) - cur.execute("CREATE INDEX IF NOT EXISTS idx_resource_timestamp ON resource_cache(timestamp)") - cur.execute("CREATE INDEX IF NOT EXISTS idx_resource_content_type ON resource_cache(content_type)") - conn.commit() - return - - # SQLite legacy path - with sqlite3.connect(self.db_path) as conn: - # HTML page cache table (existing) - conn.execute(""" - CREATE TABLE IF NOT EXISTS cache ( - url TEXT PRIMARY KEY, - content BLOB, - timestamp REAL, - status_code INTEGER - ) - """) - conn.execute(""" - CREATE INDEX IF NOT EXISTS idx_timestamp ON cache(timestamp) - """) - - # Resource cache table (NEW: for ALL web resources - JS, CSS, images, fonts, etc.) - conn.execute(""" - CREATE TABLE IF NOT EXISTS resource_cache ( - url TEXT PRIMARY KEY, - content BLOB, - content_type TEXT, - status_code INTEGER, - headers TEXT, - timestamp REAL, - size_bytes INTEGER, - local_path TEXT - ) - """) - conn.execute(""" - CREATE INDEX IF NOT EXISTS idx_resource_timestamp ON resource_cache(timestamp) - """) - conn.execute(""" - CREATE INDEX IF NOT EXISTS idx_resource_content_type ON resource_cache(content_type) - """) - - # Auctions table - consolidated schema - conn.execute(""" + with self._pg() as conn, conn.cursor() as cur: + # Auctions + cur.execute( + """ CREATE TABLE IF NOT EXISTS auctions ( auction_id TEXT PRIMARY KEY, url TEXT UNIQUE, @@ -250,16 +173,31 @@ class CacheManager: type TEXT, lot_count INTEGER DEFAULT 0, closing_time TEXT, - discovered_at INTEGER + discovered_at BIGINT ) - """) - conn.execute("CREATE INDEX IF NOT EXISTS idx_auctions_country ON auctions(country)") + """ + ) + cur.execute("CREATE INDEX IF NOT EXISTS idx_auctions_country ON auctions(country)") - # Lots table - consolidated schema with all fields from working database - conn.execute(""" + # Cache + cur.execute( + """ + CREATE TABLE IF NOT EXISTS cache ( + url TEXT PRIMARY KEY, + content BYTEA, + timestamp DOUBLE PRECISION, + status_code INTEGER + ) + """ + ) + cur.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON cache(timestamp)") + + # Lots + cur.execute( + """ CREATE TABLE IF NOT EXISTS lots ( lot_id TEXT PRIMARY KEY, - auction_id TEXT, + auction_id TEXT REFERENCES auctions(auction_id), url TEXT UNIQUE, title TEXT, current_bid TEXT, @@ -285,20 +223,20 @@ class CacheManager: attributes_json TEXT, first_bid_time TEXT, last_bid_time TEXT, - bid_velocity REAL, - bid_increment REAL, + bid_velocity DOUBLE PRECISION, + bid_increment DOUBLE PRECISION, year_manufactured INTEGER, - condition_score REAL, + condition_score DOUBLE PRECISION, condition_description TEXT, serial_number TEXT, damage_description TEXT, followers_count INTEGER DEFAULT 0, - estimated_min_price REAL, - estimated_max_price REAL, + estimated_min_price DOUBLE PRECISION, + estimated_max_price DOUBLE PRECISION, lot_condition TEXT, appearance TEXT, - estimated_min REAL, - estimated_max REAL, + estimated_min DOUBLE PRECISION, + estimated_max DOUBLE PRECISION, next_bid_step_cents INTEGER, condition TEXT, category_path TEXT, @@ -307,166 +245,84 @@ class CacheManager: bidding_status TEXT, packaging TEXT, quantity INTEGER, - vat REAL, - buyer_premium_percentage REAL, + vat DOUBLE PRECISION, + buyer_premium_percentage DOUBLE PRECISION, remarks TEXT, - reserve_price REAL, + reserve_price DOUBLE PRECISION, reserve_met INTEGER, view_count INTEGER, api_data_json TEXT, - next_scrape_at INTEGER, - scrape_priority INTEGER DEFAULT 0, - FOREIGN KEY (auction_id) REFERENCES auctions(auction_id) + next_scrape_at BIGINT, + scrape_priority INTEGER DEFAULT 0 ) - """) - conn.execute("CREATE INDEX IF NOT EXISTS idx_lots_sale_id ON lots(sale_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_lots_closing_time ON lots(closing_time)") + """ + ) + cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_sale_id ON lots(sale_id)") + cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_closing_time ON lots(closing_time)") + cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_next_scrape ON lots(next_scrape_at)") + cur.execute("CREATE INDEX IF NOT EXISTS idx_lots_priority ON lots(scrape_priority DESC)") - # Images table - conn.execute(""" + # Images + cur.execute( + """ CREATE TABLE IF NOT EXISTS images ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - lot_id TEXT, + id SERIAL PRIMARY KEY, + lot_id TEXT REFERENCES lots(lot_id), url TEXT, local_path TEXT, downloaded INTEGER DEFAULT 0, labels TEXT, - processed_at INTEGER, - FOREIGN KEY (lot_id) REFERENCES lots(lot_id) + processed_at BIGINT ) - """) - conn.execute("CREATE INDEX IF NOT EXISTS idx_images_lot_id ON images(lot_id)") + """ + ) + cur.execute("CREATE INDEX IF NOT EXISTS idx_images_lot_id ON images(lot_id)") + cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_lot_url ON images(lot_id, url)") - # Remove duplicates before creating unique index - conn.execute(""" - DELETE FROM images - WHERE id NOT IN ( - SELECT MIN(id) - FROM images - GROUP BY lot_id, url - ) - """) - conn.execute(""" - CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_lot_url - ON images(lot_id, url) - """) - - # Bid history table - conn.execute(""" + # Bid history + cur.execute( + """ CREATE TABLE IF NOT EXISTS bid_history ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - lot_id TEXT NOT NULL, - bid_amount REAL NOT NULL, + id SERIAL PRIMARY KEY, + lot_id TEXT REFERENCES lots(lot_id), + bid_amount DOUBLE PRECISION NOT NULL, bid_time TEXT NOT NULL, is_autobid INTEGER DEFAULT 0, bidder_id TEXT, bidder_number INTEGER, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (lot_id) REFERENCES lots(lot_id) + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) - """) - conn.execute(""" - CREATE INDEX IF NOT EXISTS idx_bid_history_lot_time - ON bid_history(lot_id, bid_time) - """) - conn.execute(""" - CREATE INDEX IF NOT EXISTS idx_bid_history_bidder - ON bid_history(bidder_id) - """) - - # MIGRATIONS: Add new columns to existing tables - self._run_migrations(conn) - + """ + ) + cur.execute("CREATE INDEX IF NOT EXISTS idx_bid_history_bidder ON bid_history(bidder_id)") + cur.execute("CREATE INDEX IF NOT EXISTS idx_bid_history_lot_time ON bid_history(lot_id, bid_time)") + # Resource cache + cur.execute( + """ + CREATE TABLE IF NOT EXISTS resource_cache ( + url TEXT PRIMARY KEY, + content BYTEA, + content_type TEXT, + status_code INTEGER, + headers TEXT, + timestamp DOUBLE PRECISION, + size_bytes INTEGER, + local_path TEXT + ) + """ + ) + cur.execute("CREATE INDEX IF NOT EXISTS idx_resource_timestamp ON resource_cache(timestamp)") + cur.execute("CREATE INDEX IF NOT EXISTS idx_resource_content_type ON resource_cache(content_type)") conn.commit() + return - def _run_migrations(self, conn): - """Run database migrations to add new columns to existing tables""" - print("Checking for database migrations...") - - # Check and add new columns to lots table - cursor = conn.execute("PRAGMA table_info(lots)") - lots_columns = {row[1] for row in cursor.fetchall()} - - migrations_applied = False - - if 'api_data_json' not in lots_columns: - print(" > Adding api_data_json column to lots table...") - conn.execute("ALTER TABLE lots ADD COLUMN api_data_json TEXT") - migrations_applied = True - - if 'next_scrape_at' not in lots_columns: - print(" > Adding next_scrape_at column to lots table...") - conn.execute("ALTER TABLE lots ADD COLUMN next_scrape_at INTEGER") - migrations_applied = True - - if 'scrape_priority' not in lots_columns: - print(" > Adding scrape_priority column to lots table...") - conn.execute("ALTER TABLE lots ADD COLUMN scrape_priority INTEGER DEFAULT 0") - migrations_applied = True - - # Check resource_cache table structure - cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='resource_cache'") - resource_cache_exists = cursor.fetchone() is not None - - if resource_cache_exists: - # Check if table has correct structure - cursor = conn.execute("PRAGMA table_info(resource_cache)") - resource_columns = {row[1] for row in cursor.fetchall()} - - # Expected columns - expected_columns = {'url', 'content', 'content_type', 'status_code', 'headers', 'timestamp', 'size_bytes', 'local_path'} - - if resource_columns != expected_columns: - print(" > Rebuilding resource_cache table with correct schema...") - # Backup old data count - cursor = conn.execute("SELECT COUNT(*) FROM resource_cache") - old_count = cursor.fetchone()[0] - print(f" (Preserving {old_count} cached resources)") - - # Drop and recreate with correct schema - conn.execute("DROP TABLE IF EXISTS resource_cache") - conn.execute(""" - CREATE TABLE resource_cache ( - url TEXT PRIMARY KEY, - content BLOB, - content_type TEXT, - status_code INTEGER, - headers TEXT, - timestamp REAL, - size_bytes INTEGER, - local_path TEXT - ) - """) - conn.execute("CREATE INDEX idx_resource_timestamp ON resource_cache(timestamp)") - conn.execute("CREATE INDEX idx_resource_content_type ON resource_cache(content_type)") - migrations_applied = True - print(" * resource_cache table rebuilt") - - # Create indexes after migrations (when columns exist) - try: - conn.execute("CREATE INDEX IF NOT EXISTS idx_lots_priority ON lots(scrape_priority DESC)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_lots_next_scrape ON lots(next_scrape_at)") - except: - pass # Indexes might already exist - - if migrations_applied: - print(" * Migrations complete") - else: - print(" * Database schema is up to date") + # SQLite migrations removed; PostgreSQL uses IF NOT EXISTS DDL above def get(self, url: str, max_age_hours: int = 24) -> Optional[Dict]: """Get cached page if it exists and is not too old""" - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute("SELECT content, timestamp, status_code FROM cache WHERE url = %s", (url,)) - row = cur.fetchone() - else: - with sqlite3.connect(self.db_path) as conn: - cursor = conn.execute( - "SELECT content, timestamp, status_code FROM cache WHERE url = ?", - (url,) - ) - row = cursor.fetchone() + with self._pg() as conn, conn.cursor() as cur: + cur.execute("SELECT content, timestamp, status_code FROM cache WHERE url = %s", (url,)) + row = cur.fetchone() if row: content, timestamp, status_code = row @@ -494,41 +350,28 @@ class CacheManager: compressed_size = len(compressed_content) ratio = (1 - compressed_size / original_size) * 100 if original_size > 0 else 0 - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute( - """ - INSERT INTO cache (url, content, timestamp, status_code) - VALUES (%s, %s, %s, %s) - ON CONFLICT (url) - DO UPDATE SET content = EXCLUDED.content, - timestamp = EXCLUDED.timestamp, - status_code = EXCLUDED.status_code - """, - (url, compressed_content, time.time(), status_code), - ) - conn.commit() - else: - with sqlite3.connect(self.db_path) as conn: - conn.execute( - "INSERT OR REPLACE INTO cache (url, content, timestamp, status_code) VALUES (?, ?, ?, ?)", - (url, compressed_content, time.time(), status_code) - ) - conn.commit() + with self._pg() as conn, conn.cursor() as cur: + cur.execute( + """ + INSERT INTO cache (url, content, timestamp, status_code) + VALUES (%s, %s, %s, %s) + ON CONFLICT (url) + DO UPDATE SET content = EXCLUDED.content, + timestamp = EXCLUDED.timestamp, + status_code = EXCLUDED.status_code + """, + (url, compressed_content, time.time(), status_code), + ) + conn.commit() print(f" -> Cached: {url} (compressed {ratio:.1f}%)") def clear_old(self, max_age_hours: int = 168): """Clear old cache entries to prevent database bloat""" cutoff_time = time.time() - (max_age_hours * 3600) - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute("DELETE FROM cache WHERE timestamp < %s", (cutoff_time,)) - deleted = cur.rowcount or 0 - conn.commit() - else: - with sqlite3.connect(self.db_path) as conn: - deleted = conn.execute("DELETE FROM cache WHERE timestamp < ?", (cutoff_time,)).rowcount - conn.commit() + with self._pg() as conn, conn.cursor() as cur: + cur.execute("DELETE FROM cache WHERE timestamp < %s", (cutoff_time,)) + deleted = cur.rowcount or 0 + conn.commit() if (deleted or 0) > 0: print(f" → Cleared {deleted} old cache entries") @@ -544,8 +387,7 @@ class CacheManager: city = parts[0] country = parts[-1] - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: + with self._pg() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO auctions @@ -583,32 +425,6 @@ class CacheManager: ), ) conn.commit() - else: - with sqlite3.connect(self.db_path) as conn: - conn.execute( - """ - INSERT OR REPLACE INTO auctions - (auction_id, url, title, location, lots_count, first_lot_closing_time, scraped_at, - city, country, type, lot_count, closing_time, discovered_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - auction_data['auction_id'], - auction_data['url'], - auction_data['title'], - location, - auction_data.get('lots_count', 0), - auction_data.get('first_lot_closing_time', ''), - auction_data['scraped_at'], - city, - country, - 'online', - auction_data.get('lots_count', 0), - auction_data.get('first_lot_closing_time', ''), - int(time.time()), - ), - ) - conn.commit() def save_lot(self, lot_data: Dict): """Save lot data to database""" @@ -651,8 +467,7 @@ class CacheManager: lot_data.get('next_scrape_at'), lot_data.get('scrape_priority', 0), ) - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: + with self._pg() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO lots @@ -706,112 +521,53 @@ class CacheManager: params, ) conn.commit() - else: - with sqlite3.connect(self.db_path) as conn: - conn.execute( - """ - INSERT OR REPLACE INTO lots - (lot_id, auction_id, url, title, current_bid, starting_bid, minimum_bid, - bid_count, closing_time, viewing_time, pickup_date, location, description, - category, status, brand, model, attributes_json, - first_bid_time, last_bid_time, bid_velocity, bid_increment, - year_manufactured, condition_score, condition_description, - serial_number, manufacturer, damage_description, - followers_count, estimated_min_price, estimated_max_price, lot_condition, appearance, - scraped_at, api_data_json, next_scrape_at, scrape_priority) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - params, - ) - conn.commit() def save_bid_history(self, lot_id: str, bid_records: List[Dict]): """Save bid history records to database""" if not bid_records: return - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute("DELETE FROM bid_history WHERE lot_id = %s", (lot_id,)) - for record in bid_records: - cur.execute( - """ - INSERT INTO bid_history - (lot_id, bid_amount, bid_time, is_autobid, bidder_id, bidder_number) - VALUES (%s, %s, %s, %s, %s, %s) - """, - ( - record['lot_id'], - record['bid_amount'], - record['bid_time'], - 1 if record['is_autobid'] else 0, - record['bidder_id'], - record['bidder_number'], - ), - ) - conn.commit() - else: - with sqlite3.connect(self.db_path) as conn: - conn.execute("DELETE FROM bid_history WHERE lot_id = ?", (lot_id,)) - for record in bid_records: - conn.execute( - """ - INSERT INTO bid_history - (lot_id, bid_amount, bid_time, is_autobid, bidder_id, bidder_number) - VALUES (?, ?, ?, ?, ?, ?) - """, - ( - record['lot_id'], - record['bid_amount'], - record['bid_time'], - 1 if record['is_autobid'] else 0, - record['bidder_id'], - record['bidder_number'], - ), - ) - conn.commit() + with self._pg() as conn, conn.cursor() as cur: + cur.execute("DELETE FROM bid_history WHERE lot_id = %s", (lot_id,)) + for record in bid_records: + cur.execute( + """ + INSERT INTO bid_history + (lot_id, bid_amount, bid_time, is_autobid, bidder_id, bidder_number) + VALUES (%s, %s, %s, %s, %s, %s) + """, + ( + record['lot_id'], + record['bid_amount'], + record['bid_time'], + 1 if record['is_autobid'] else 0, + record['bidder_id'], + record['bidder_number'], + ), + ) + conn.commit() def save_images(self, lot_id: str, image_urls: List[str]): """Save image URLs for a lot (prevents duplicates via unique constraint)""" - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - for url in image_urls: - cur.execute( - """ - INSERT INTO images (lot_id, url, downloaded) - VALUES (%s, %s, 0) - ON CONFLICT (lot_id, url) DO NOTHING - """, - (lot_id, url), - ) - conn.commit() - else: - with sqlite3.connect(self.db_path) as conn: - for url in image_urls: - conn.execute( - """ - INSERT OR IGNORE INTO images (lot_id, url, downloaded) - VALUES (?, ?, 0) - """, - (lot_id, url), - ) - conn.commit() + with self._pg() as conn, conn.cursor() as cur: + for url in image_urls: + cur.execute( + """ + INSERT INTO images (lot_id, url, downloaded) + VALUES (%s, %s, 0) + ON CONFLICT (lot_id, url) DO NOTHING + """, + (lot_id, url), + ) + conn.commit() def update_image_local_path(self, lot_id: str, url: str, local_path: str): - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute( - "UPDATE images SET local_path = %s, downloaded = 1 WHERE lot_id = %s AND url = %s", - (local_path, lot_id, url), - ) - conn.commit() - else: - with sqlite3.connect(self.db_path) as conn: - conn.execute( - "UPDATE images SET local_path = ?, downloaded = 1 WHERE lot_id = ? AND url = ?", - (local_path, lot_id, url), - ) - conn.commit() + with self._pg() as conn, conn.cursor() as cur: + cur.execute( + "UPDATE images SET local_path = %s, downloaded = 1 WHERE lot_id = %s AND url = %s", + (local_path, lot_id, url), + ) + conn.commit() def save_resource(self, url: str, content: bytes, content_type: str, status_code: int = 200, headers: Optional[Dict] = None, local_path: Optional[str] = None, cache_key: Optional[str] = None): @@ -824,36 +580,24 @@ class CacheManager: size_bytes = len(content) if content else 0 key = cache_key if cache_key else url - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute( - """ - INSERT INTO resource_cache - (url, content, content_type, status_code, headers, timestamp, size_bytes, local_path) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (url) - DO UPDATE SET content = EXCLUDED.content, - content_type = EXCLUDED.content_type, - status_code = EXCLUDED.status_code, - headers = EXCLUDED.headers, - timestamp = EXCLUDED.timestamp, - size_bytes = EXCLUDED.size_bytes, - local_path = EXCLUDED.local_path - """, - (key, content, content_type, status_code, headers_json, time.time(), size_bytes, local_path), - ) - conn.commit() - else: - with sqlite3.connect(self.db_path) as conn: - conn.execute( - """ - INSERT OR REPLACE INTO resource_cache - (url, content, content_type, status_code, headers, timestamp, size_bytes, local_path) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, - (key, content, content_type, status_code, headers_json, time.time(), size_bytes, local_path), - ) - conn.commit() + with self._pg() as conn, conn.cursor() as cur: + cur.execute( + """ + INSERT INTO resource_cache + (url, content, content_type, status_code, headers, timestamp, size_bytes, local_path) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (url) + DO UPDATE SET content = EXCLUDED.content, + content_type = EXCLUDED.content_type, + status_code = EXCLUDED.status_code, + headers = EXCLUDED.headers, + timestamp = EXCLUDED.timestamp, + size_bytes = EXCLUDED.size_bytes, + local_path = EXCLUDED.local_path + """, + (key, content, content_type, status_code, headers_json, time.time(), size_bytes, local_path), + ) + conn.commit() def get_resource(self, url: str, cache_key: Optional[str] = None) -> Optional[Dict]: """Get a cached resource @@ -862,23 +606,12 @@ class CacheManager: cache_key: Optional composite key to lookup """ key = cache_key if cache_key else url - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute( - "SELECT content, content_type, status_code, headers, timestamp, size_bytes, local_path FROM resource_cache WHERE url = %s", - (key,), - ) - row = cur.fetchone() - else: - with sqlite3.connect(self.db_path) as conn: - cursor = conn.execute( - """ - SELECT content, content_type, status_code, headers, timestamp, size_bytes, local_path - FROM resource_cache WHERE url = ? - """, - (key,), - ) - row = cursor.fetchone() + with self._pg() as conn, conn.cursor() as cur: + cur.execute( + "SELECT content, content_type, status_code, headers, timestamp, size_bytes, local_path FROM resource_cache WHERE url = %s", + (key,), + ) + row = cur.fetchone() if row: return { @@ -897,135 +630,101 @@ class CacheManager: # Query helpers for scraper/monitor/export # ------------------------ def get_counts(self) -> Dict[str, int]: - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute("SELECT COUNT(*) FROM auctions") - auctions = cur.fetchone()[0] - cur.execute("SELECT COUNT(*) FROM lots") - lots = cur.fetchone()[0] - return {"auctions": auctions, "lots": lots} - else: - with sqlite3.connect(self.db_path) as conn: - cur = conn.cursor() - cur.execute("SELECT COUNT(*) FROM auctions") - auctions = cur.fetchone()[0] - cur.execute("SELECT COUNT(*) FROM lots") - lots = cur.fetchone()[0] - return {"auctions": auctions, "lots": lots} + with self._pg() as conn, conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM auctions") + auctions = cur.fetchone()[0] + cur.execute("SELECT COUNT(*) FROM lots") + lots = cur.fetchone()[0] + return {"auctions": auctions, "lots": lots} def get_lot_api_fields(self, lot_id: str) -> Optional[Tuple]: sql = ( "SELECT followers_count, estimated_min_price, current_bid, bid_count, closing_time, status " - "FROM lots WHERE lot_id = %s" if self.use_postgres else - "SELECT followers_count, estimated_min_price, current_bid, bid_count, closing_time, status FROM lots WHERE lot_id = ?" + "FROM lots WHERE lot_id = %s" ) params = (lot_id,) - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute(sql, params) - return cur.fetchone() - else: - with sqlite3.connect(self.db_path) as conn: - cur = conn.cursor() - cur.execute(sql, params) - return cur.fetchone() + with self._pg() as conn, conn.cursor() as cur: + cur.execute(sql, params) + return cur.fetchone() def get_page_record_by_url(self, url: str) -> Optional[Dict]: # Try lot record first by URL - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute("SELECT * FROM lots WHERE url = %s", (url,)) - lot_row = cur.fetchone() - if lot_row: - col_names = [desc.name for desc in cur.description] - lot_dict = dict(zip(col_names, lot_row)) - return {"type": "lot", **lot_dict} - cur.execute("SELECT * FROM auctions WHERE url = %s", (url,)) - auc_row = cur.fetchone() - if auc_row: - col_names = [desc.name for desc in cur.description] - auc_dict = dict(zip(col_names, auc_row)) - return {"type": "auction", **auc_dict} - else: - with sqlite3.connect(self.db_path) as conn: - cur = conn.cursor() - cur.execute("SELECT * FROM lots WHERE url = ?", (url,)) - lot_row = cur.fetchone() - if lot_row: - col_names = [d[0] for d in cur.description] - lot_dict = dict(zip(col_names, lot_row)) - return {"type": "lot", **lot_dict} - cur.execute("SELECT * FROM auctions WHERE url = ?", (url,)) - auc_row = cur.fetchone() - if auc_row: - col_names = [d[0] for d in cur.description] - auc_dict = dict(zip(col_names, auc_row)) - return {"type": "auction", **auc_dict} + with self._pg() as conn, conn.cursor() as cur: + cur.execute("SELECT * FROM lots WHERE url = %s", (url,)) + lot_row = cur.fetchone() + if lot_row: + col_names = [desc.name for desc in cur.description] + lot_dict = dict(zip(col_names, lot_row)) + return {"type": "lot", **lot_dict} + cur.execute("SELECT * FROM auctions WHERE url = %s", (url,)) + auc_row = cur.fetchone() + if auc_row: + col_names = [desc.name for desc in cur.description] + auc_dict = dict(zip(col_names, auc_row)) + return {"type": "auction", **auc_dict} return None def fetch_all(self, table: str) -> List[Dict]: assert table in {"auctions", "lots"} - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute(f"SELECT * FROM {table}") - rows = cur.fetchall() - col_names = [desc.name for desc in cur.description] - return [dict(zip(col_names, r)) for r in rows] - else: - with sqlite3.connect(self.db_path) as conn: - conn.row_factory = sqlite3.Row - cur = conn.cursor() - cur.execute(f"SELECT * FROM {table}") - return [dict(row) for row in cur.fetchall()] + with self._pg() as conn, conn.cursor() as cur: + cur.execute(f"SELECT * FROM {table}") + rows = cur.fetchall() + col_names = [desc.name for desc in cur.description] + return [dict(zip(col_names, r)) for r in rows] def get_lot_times(self, lot_id: str) -> Tuple[Optional[str], Optional[str]]: sql = ( - "SELECT viewing_time, pickup_date FROM lots WHERE lot_id = %s" if self.use_postgres else - "SELECT viewing_time, pickup_date FROM lots WHERE lot_id = ?" + "SELECT viewing_time, pickup_date FROM lots WHERE lot_id = %s" ) params = (lot_id,) - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute(sql, params) - row = cur.fetchone() - else: - with sqlite3.connect(self.db_path) as conn: - cur = conn.cursor() - cur.execute(sql, params) - row = cur.fetchone() + with self._pg() as conn, conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() if not row: return None, None return row[0], row[1] def has_bid_history(self, lot_id: str) -> bool: - sql = ( - "SELECT COUNT(*) FROM bid_history WHERE lot_id = %s" if self.use_postgres else - "SELECT COUNT(*) FROM bid_history WHERE lot_id = ?" - ) + sql = ("SELECT COUNT(*) FROM bid_history WHERE lot_id = %s") params = (lot_id,) - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute(sql, params) - cnt = cur.fetchone()[0] - else: - with sqlite3.connect(self.db_path) as conn: - cur = conn.cursor() - cur.execute(sql, params) - cnt = cur.fetchone()[0] + with self._pg() as conn, conn.cursor() as cur: + cur.execute(sql, params) + cnt = cur.fetchone()[0] return cnt > 0 def get_downloaded_image_urls(self, lot_id: str) -> List[str]: - sql = ( - "SELECT url FROM images WHERE lot_id = %s AND downloaded = 1" if self.use_postgres else - "SELECT url FROM images WHERE lot_id = ? AND downloaded = 1" - ) + sql = ("SELECT url FROM images WHERE lot_id = %s AND downloaded = 1") params = (lot_id,) - if self.use_postgres: - with self._pg() as conn, conn.cursor() as cur: - cur.execute(sql, params) - return [r[0] for r in cur.fetchall()] - else: - with sqlite3.connect(self.db_path) as conn: - cur = conn.cursor() - cur.execute(sql, params) - return [r[0] for r in cur.fetchall()] \ No newline at end of file + with self._pg() as conn, conn.cursor() as cur: + cur.execute(sql, params) + return [r[0] for r in cur.fetchall()] + + # ------------------------ + # Aggregation helpers for scraper + # ------------------------ + def get_distinct_urls(self) -> Dict[str, List[str]]: + with self._pg() as conn, conn.cursor() as cur: + cur.execute("SELECT DISTINCT url FROM auctions") + auctions = [r[0] for r in cur.fetchall() if r and r[0]] + cur.execute("SELECT DISTINCT url FROM lots") + lots = [r[0] for r in cur.fetchall() if r and r[0]] + return {"auctions": auctions, "lots": lots} + + def get_lot_priority_info(self, lot_id: str, url: str) -> Tuple[Optional[str], Optional[str], Optional[int], Optional[int]]: + with self._pg() as conn, conn.cursor() as cur: + cur.execute( + """ + SELECT closing_time, scraped_at, scrape_priority, next_scrape_at + FROM lots WHERE lot_id = %s OR url = %s + """, + (lot_id, url), + ) + row = cur.fetchone() + if not row: + return None, None, None, None + return row[0], row[1], row[2], row[3] + + def get_recent_cached_urls(self, limit: int = 10) -> List[str]: + with self._pg() as conn, conn.cursor() as cur: + cur.execute("SELECT url FROM cache ORDER BY timestamp DESC LIMIT %s", (limit,)) + return [r[0] for r in cur.fetchall()] \ No newline at end of file diff --git a/src/config.py b/src/config.py index 35aa0bc..b1adc51 100644 --- a/src/config.py +++ b/src/config.py @@ -16,8 +16,8 @@ if sys.version_info < (3, 10): # ==================== CONFIGURATION ==================== BASE_URL = "https://www.troostwijkauctions.com" -# Primary database: PostgreSQL -# You can override via environment variable DATABASE_URL +# Primary database: PostgreSQL only +# Override via environment variable DATABASE_URL # Example: postgresql://user:pass@host:5432/dbname DATABASE_URL = os.getenv( "DATABASE_URL", @@ -25,8 +25,17 @@ DATABASE_URL = os.getenv( "postgresql://auction:heel-goed-wachtwoord@192.168.1.159:5432/auctiondb", ).strip() -# Deprecated: legacy SQLite cache path (only used as fallback in dev/tests) -CACHE_DB = "/mnt/okcomputer/output/cache.db" +# Database connection pool controls (to avoid creating too many short-lived TCP connections) +# Environment overrides: SCAEV_DB_POOL_MIN, SCAEV_DB_POOL_MAX, SCAEV_DB_POOL_TIMEOUT +def _int_env(name: str, default: int) -> int: + try: + return int(os.getenv(name, str(default))) + except Exception: + return default + +DB_POOL_MIN = _int_env("SCAEV_DB_POOL_MIN", 1) +DB_POOL_MAX = _int_env("SCAEV_DB_POOL_MAX", 6) +DB_POOL_TIMEOUT = _int_env("SCAEV_DB_POOL_TIMEOUT", 30) # seconds to wait for a pooled connection OUTPUT_DIR = "/mnt/okcomputer/output" IMAGES_DIR = "/mnt/okcomputer/output/images" RATE_LIMIT_SECONDS = 0.5 # EXACTLY 0.5 seconds between requests diff --git a/src/db.py b/src/db.py index bf2419a..f708d0c 100644 --- a/src/db.py +++ b/src/db.py @@ -3,14 +3,12 @@ Database scaffolding for future SQLAlchemy 2.x usage. Notes: -- We keep using the current SQLite + raw SQL for operational code. -- This module prepares an engine/session bound to DATABASE_URL, defaulting to - SQLite file in config.CACHE_DB path (for local dev only). -- PostgreSQL can be enabled by setting DATABASE_URL, e.g.: - DATABASE_URL=postgresql+psycopg://user:pass@localhost:5432/scaev +- The application now uses PostgreSQL exclusively via `config.DATABASE_URL`. +- This module prepares an engine/session bound to `DATABASE_URL`. +- Example URL: `postgresql+psycopg://user:pass@host:5432/scaev` No runtime dependency from the scraper currently imports or uses this module. -It is present to bootstrap the gradual migration to SQLAlchemy 2.x. +It is present to bootstrap a possible future move to SQLAlchemy 2.x. """ from __future__ import annotations @@ -19,14 +17,11 @@ import os from typing import Optional -def get_database_url(sqlite_fallback_path: str) -> str: +def get_database_url() -> str: url = os.getenv("DATABASE_URL") - if url and url.strip(): - return url.strip() - # SQLite fallback - # Use a separate sqlite file when DATABASE_URL is not set; this does not - # alter the existing cache.db usage by raw SQL — it's just a dev convenience. - return f"sqlite:///{sqlite_fallback_path}" + if not url or not url.strip(): + raise RuntimeError("DATABASE_URL must be set for PostgreSQL connection") + return url.strip() def create_engine_and_session(database_url: str): @@ -44,16 +39,15 @@ def create_engine_and_session(database_url: str): return engine, SessionLocal -def get_sa(session_cached: dict, sqlite_fallback_path: str): +def get_sa(session_cached: dict): """Helper to lazily create and cache SQLAlchemy engine/session factory. session_cached: dict — a mutable dict, e.g., module-level {}, to store engine and factory - sqlite_fallback_path: path to a sqlite file for local development """ if 'engine' in session_cached and 'SessionLocal' in session_cached: return session_cached['engine'], session_cached['SessionLocal'] - url = get_database_url(sqlite_fallback_path) + url = get_database_url() engine, SessionLocal = create_engine_and_session(url) session_cached['engine'] = engine session_cached['SessionLocal'] = SessionLocal diff --git a/src/main.py b/src/main.py index a354ebc..2097c06 100644 --- a/src/main.py +++ b/src/main.py @@ -8,7 +8,6 @@ import sys import asyncio import json import csv -import sqlite3 from datetime import datetime from pathlib import Path @@ -16,6 +15,17 @@ import config from cache import CacheManager from scraper import TroostwijkScraper +def mask_db_url(url: str) -> str: + try: + from urllib.parse import urlparse + p = urlparse(url) + user = p.username or '' + host = p.hostname or '' + port = f":{p.port}" if p.port else '' + return f"{p.scheme}://{user}:***@{host}{port}{p.path or ''}" + except Exception: + return url + def main(): """Main execution""" # Check for test mode @@ -34,7 +44,7 @@ def main(): if config.OFFLINE: print("OFFLINE MODE ENABLED — only database and cache will be used (no network)") print(f"Rate limit: {config.RATE_LIMIT_SECONDS} seconds BETWEEN EVERY REQUEST") - print(f"Cache database: {config.CACHE_DB}") + print(f"Database URL: {mask_db_url(config.DATABASE_URL)}") print(f"Output directory: {config.OUTPUT_DIR}") print(f"Max listing pages: {config.MAX_PAGES}") print("=" * 60) diff --git a/src/scraper.py b/src/scraper.py index 6c9d191..c2b6ffa 100644 --- a/src/scraper.py +++ b/src/scraper.py @@ -723,25 +723,15 @@ class TroostwijkScraper: Returns list of (priority, url, description) tuples sorted by priority (highest first) """ - import sqlite3 - prioritized = [] current_time = int(time.time()) - conn = sqlite3.connect(self.cache.db_path) - cursor = conn.cursor() - for url in lot_urls: # Extract lot_id from URL lot_id = self.parser.extract_lot_id(url) # Try to get existing data from database - cursor.execute(""" - SELECT closing_time, scraped_at, scrape_priority, next_scrape_at - FROM lots WHERE lot_id = ? OR url = ? - """, (lot_id, url)) - - row = cursor.fetchone() + row = self.cache.get_lot_priority_info(lot_id, url) if row: closing_time, scraped_at, existing_priority, next_scrape_at = row @@ -781,8 +771,6 @@ class TroostwijkScraper: prioritized.append((priority, url, desc)) - conn.close() - # Sort by priority (highest first) prioritized.sort(key=lambda x: x[0], reverse=True) @@ -793,14 +781,9 @@ class TroostwijkScraper: if self.offline: print("Launching OFFLINE crawl (no network requests)") # Gather URLs from database - import sqlite3 - conn = sqlite3.connect(self.cache.db_path) - cur = conn.cursor() - cur.execute("SELECT DISTINCT url FROM auctions") - auction_urls = [r[0] for r in cur.fetchall() if r and r[0]] - cur.execute("SELECT DISTINCT url FROM lots") - lot_urls = [r[0] for r in cur.fetchall() if r and r[0]] - conn.close() + urls = self.cache.get_distinct_urls() + auction_urls = urls['auctions'] + lot_urls = urls['lots'] print(f" OFFLINE: {len(auction_urls)} auctions and {len(lot_urls)} lots in DB") diff --git a/src/test.py b/src/test.py index bbb813c..d8c8db8 100644 --- a/src/test.py +++ b/src/test.py @@ -4,7 +4,6 @@ Test module for debugging extraction patterns """ import sys -import sqlite3 import time import re import json @@ -27,10 +26,11 @@ def test_extraction( if not cached: print(f"ERROR: URL not found in cache: {test_url}") print(f"\nAvailable cached URLs:") - with sqlite3.connect(config.CACHE_DB) as conn: - cursor = conn.execute("SELECT url FROM cache ORDER BY timestamp DESC LIMIT 10") - for row in cursor.fetchall(): - print(f" - {row[0]}") + try: + for url in scraper.cache.get_recent_cached_urls(limit=10): + print(f" - {url}") + except Exception as e: + print(f" (failed to list recent cached URLs: {e})") return content = cached['content']