Compare commits

...

4 Commits

Author SHA1 Message Date
Tour
8a2b005d4a move.venv 2025-12-09 07:11:09 +01:00
Tour
b0ee52b686 enrich data 2025-12-09 02:05:46 +01:00
Tour
06f63732b1 enrich data 2025-12-09 01:39:36 +01:00
Tour
83d0fc1329 enrich data 2025-12-09 01:19:55 +01:00
11 changed files with 1285 additions and 33 deletions

View File

@@ -2,9 +2,9 @@
"""
Client for fetching bid history from Troostwijk REST API
"""
import aiohttp
from typing import Dict, List, Optional
from datetime import datetime
import config
BID_HISTORY_ENDPOINT = "https://shared-api.tbauctions.com/bidmanagement/lots/{lot_uuid}/bidding-history"
@@ -20,6 +20,13 @@ async def fetch_bid_history(lot_uuid: str, page_size: int = 100) -> Optional[Lis
Returns:
List of bid dictionaries or None if request fails
"""
if config.OFFLINE:
# Offline mode: do not perform any network requests
print(" OFFLINE: skipping bid history fetch")
return None
import aiohttp
all_bids = []
page_number = 1
has_more = True

View File

@@ -6,6 +6,7 @@ Cache Manager module for SQLite-based caching and data storage
import sqlite3
import time
import zlib
import json
from datetime import datetime
from typing import Dict, List, Optional
@@ -21,7 +22,7 @@ class CacheManager:
def _init_db(self):
"""Initialize cache and data storage database with consolidated schema"""
with sqlite3.connect(self.db_path) as conn:
# Cache table
# HTML page cache table (existing)
conn.execute("""
CREATE TABLE IF NOT EXISTS cache (
url TEXT PRIMARY KEY,
@@ -34,6 +35,26 @@ class CacheManager:
CREATE INDEX IF NOT EXISTS idx_timestamp ON cache(timestamp)
""")
# Resource cache table (NEW: for ALL web resources - JS, CSS, images, fonts, etc.)
conn.execute("""
CREATE TABLE IF NOT EXISTS resource_cache (
url TEXT PRIMARY KEY,
content BLOB,
content_type TEXT,
status_code INTEGER,
headers TEXT,
timestamp REAL,
size_bytes INTEGER,
local_path TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_resource_timestamp ON resource_cache(timestamp)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_resource_content_type ON resource_cache(content_type)
""")
# Auctions table - consolidated schema
conn.execute("""
CREATE TABLE IF NOT EXISTS auctions (
@@ -112,10 +133,14 @@ class CacheManager:
reserve_price REAL,
reserve_met INTEGER,
view_count INTEGER,
api_data_json TEXT,
next_scrape_at INTEGER,
scrape_priority INTEGER DEFAULT 0,
FOREIGN KEY (auction_id) REFERENCES auctions(auction_id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_lots_sale_id ON lots(sale_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_lots_closing_time ON lots(closing_time)")
# Images table
conn.execute("""
@@ -169,8 +194,86 @@ class CacheManager:
ON bid_history(bidder_id)
""")
# MIGRATIONS: Add new columns to existing tables
self._run_migrations(conn)
conn.commit()
def _run_migrations(self, conn):
"""Run database migrations to add new columns to existing tables"""
print("Checking for database migrations...")
# Check and add new columns to lots table
cursor = conn.execute("PRAGMA table_info(lots)")
lots_columns = {row[1] for row in cursor.fetchall()}
migrations_applied = False
if 'api_data_json' not in lots_columns:
print(" > Adding api_data_json column to lots table...")
conn.execute("ALTER TABLE lots ADD COLUMN api_data_json TEXT")
migrations_applied = True
if 'next_scrape_at' not in lots_columns:
print(" > Adding next_scrape_at column to lots table...")
conn.execute("ALTER TABLE lots ADD COLUMN next_scrape_at INTEGER")
migrations_applied = True
if 'scrape_priority' not in lots_columns:
print(" > Adding scrape_priority column to lots table...")
conn.execute("ALTER TABLE lots ADD COLUMN scrape_priority INTEGER DEFAULT 0")
migrations_applied = True
# Check resource_cache table structure
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='resource_cache'")
resource_cache_exists = cursor.fetchone() is not None
if resource_cache_exists:
# Check if table has correct structure
cursor = conn.execute("PRAGMA table_info(resource_cache)")
resource_columns = {row[1] for row in cursor.fetchall()}
# Expected columns
expected_columns = {'url', 'content', 'content_type', 'status_code', 'headers', 'timestamp', 'size_bytes', 'local_path'}
if resource_columns != expected_columns:
print(" > Rebuilding resource_cache table with correct schema...")
# Backup old data count
cursor = conn.execute("SELECT COUNT(*) FROM resource_cache")
old_count = cursor.fetchone()[0]
print(f" (Preserving {old_count} cached resources)")
# Drop and recreate with correct schema
conn.execute("DROP TABLE IF EXISTS resource_cache")
conn.execute("""
CREATE TABLE resource_cache (
url TEXT PRIMARY KEY,
content BLOB,
content_type TEXT,
status_code INTEGER,
headers TEXT,
timestamp REAL,
size_bytes INTEGER,
local_path TEXT
)
""")
conn.execute("CREATE INDEX idx_resource_timestamp ON resource_cache(timestamp)")
conn.execute("CREATE INDEX idx_resource_content_type ON resource_cache(content_type)")
migrations_applied = True
print(" * resource_cache table rebuilt")
# Create indexes after migrations (when columns exist)
try:
conn.execute("CREATE INDEX IF NOT EXISTS idx_lots_priority ON lots(scrape_priority DESC)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_lots_next_scrape ON lots(next_scrape_at)")
except:
pass # Indexes might already exist
if migrations_applied:
print(" * Migrations complete")
else:
print(" * Database schema is up to date")
def get(self, url: str, max_age_hours: int = 24) -> Optional[Dict]:
"""Get cached page if it exists and is not too old"""
with sqlite3.connect(self.db_path) as conn:
@@ -212,7 +315,7 @@ class CacheManager:
(url, compressed_content, time.time(), status_code)
)
conn.commit()
print(f" Cached: {url} (compressed {ratio:.1f}%)")
print(f" -> Cached: {url} (compressed {ratio:.1f}%)")
def clear_old(self, max_age_hours: int = 168):
"""Clear old cache entries to prevent database bloat"""
@@ -270,8 +373,8 @@ class CacheManager:
year_manufactured, condition_score, condition_description,
serial_number, manufacturer, damage_description,
followers_count, estimated_min_price, estimated_max_price, lot_condition, appearance,
scraped_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
scraped_at, api_data_json, next_scrape_at, scrape_priority)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
lot_data['lot_id'],
lot_data.get('auction_id', ''),
@@ -306,7 +409,10 @@ class CacheManager:
lot_data.get('estimated_max_price'),
lot_data.get('lot_condition', ''),
lot_data.get('appearance', ''),
lot_data['scraped_at']
lot_data['scraped_at'],
lot_data.get('api_data_json'),
lot_data.get('next_scrape_at'),
lot_data.get('scrape_priority', 0)
))
conn.commit()
@@ -343,4 +449,52 @@ class CacheManager:
INSERT OR IGNORE INTO images (lot_id, url, downloaded)
VALUES (?, ?, 0)
""", (lot_id, url))
conn.commit()
conn.commit()
def save_resource(self, url: str, content: bytes, content_type: str, status_code: int = 200,
headers: Optional[Dict] = None, local_path: Optional[str] = None, cache_key: Optional[str] = None):
"""Save a web resource (JS, CSS, image, font, etc.) to cache
Args:
cache_key: Optional composite key (url + body hash for POST requests)
"""
with sqlite3.connect(self.db_path) as conn:
headers_json = json.dumps(headers) if headers else None
size_bytes = len(content) if content else 0
# Use cache_key if provided, otherwise use url
key = cache_key if cache_key else url
conn.execute("""
INSERT OR REPLACE INTO resource_cache
(url, content, content_type, status_code, headers, timestamp, size_bytes, local_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (key, content, content_type, status_code, headers_json, time.time(), size_bytes, local_path))
conn.commit()
def get_resource(self, url: str, cache_key: Optional[str] = None) -> Optional[Dict]:
"""Get a cached resource
Args:
cache_key: Optional composite key to lookup
"""
with sqlite3.connect(self.db_path) as conn:
key = cache_key if cache_key else url
cursor = conn.execute("""
SELECT content, content_type, status_code, headers, timestamp, size_bytes, local_path
FROM resource_cache WHERE url = ?
""", (key,))
row = cursor.fetchone()
if row:
return {
'content': row[0],
'content_type': row[1],
'status_code': row[2],
'headers': json.loads(row[3]) if row[3] else {},
'timestamp': row[4],
'size_bytes': row[5],
'local_path': row[6],
'cached': True
}
return None

View File

@@ -4,6 +4,7 @@ Configuration module for Scaev Auctions Scraper
"""
import sys
import os
from pathlib import Path
# Require Python 3.10+
@@ -19,7 +20,12 @@ OUTPUT_DIR = "/mnt/okcomputer/output"
IMAGES_DIR = "/mnt/okcomputer/output/images"
RATE_LIMIT_SECONDS = 0.5 # EXACTLY 0.5 seconds between requests
MAX_PAGES = 50 # Number of listing pages to crawl
DOWNLOAD_IMAGES = True # Set to True to download images
# OFFLINE mode: when enabled, no network calls are performed; only DB/cache are used
OFFLINE = os.getenv("SCAEV_OFFLINE", "0").strip().lower() in {"1", "true", "yes", "on"}
# Image downloading can be disabled explicitly; in OFFLINE it's always disabled
DOWNLOAD_IMAGES = False if OFFLINE else True
# Setup directories
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)

View File

@@ -2,8 +2,8 @@
"""
GraphQL client for fetching lot bidding data from Troostwijk API
"""
import aiohttp
from typing import Dict, Optional
import config
GRAPHQL_ENDPOINT = "https://storefront.tbauctions.com/storefront/graphql"
@@ -31,17 +31,6 @@ query AuctionData($auctionId: TbaUuid!, $locale: String!, $platform: Platform!)
LOT_BIDDING_QUERY = """
query LotBiddingData($lotDisplayId: String!, $locale: String!, $platform: Platform!) {
lotDetails(displayId: $lotDisplayId, locale: $locale, platform: $platform) {
estimatedFullPrice {
min {
cents
currency
}
max {
cents
currency
}
saleTerm
}
lot {
id
displayId
@@ -86,6 +75,13 @@ async def fetch_auction_data(auction_id: str) -> Optional[Dict]:
Returns:
Dict with auction data or None if request fails
"""
if config.OFFLINE:
# Offline mode: do not perform any network requests
print(" OFFLINE: skipping GraphQL auction fetch")
return None
import aiohttp
variables = {
"auctionId": auction_id,
"locale": "nl",
@@ -122,6 +118,13 @@ async def fetch_lot_bidding_data(lot_display_id: str) -> Optional[Dict]:
Returns:
Dict with bidding data or None if request fails
"""
if config.OFFLINE:
# Offline mode: do not perform any network requests
print(" OFFLINE: skipping GraphQL lot bidding fetch")
return None
import aiohttp
variables = {
"lotDisplayId": lot_display_id,
"locale": "nl",

View File

@@ -31,6 +31,8 @@ def main():
print("Scaev Auctions Scraper")
print("=" * 60)
if config.OFFLINE:
print("OFFLINE MODE ENABLED — only database and cache will be used (no network)")
print(f"Rate limit: {config.RATE_LIMIT_SECONDS} seconds BETWEEN EVERY REQUEST")
print(f"Cache database: {config.CACHE_DB}")
print(f"Output directory: {config.OUTPUT_DIR}")

View File

@@ -103,6 +103,8 @@ class AuctionMonitor:
print("="*60)
print("AUCTION MONITOR STARTED")
print("="*60)
if config.OFFLINE:
print("OFFLINE MODE ENABLED — only database and cache will be used (no network)")
print(f"Poll interval: {self.poll_interval / 60:.0f} minutes")
print(f"Cache database: {config.CACHE_DB}")
print(f"Rate limit: {config.RATE_LIMIT_SECONDS}s between requests")

171
src/priority.py Normal file
View File

@@ -0,0 +1,171 @@
#!/usr/bin/env python3
"""
Priority calculation for intelligent scraping
"""
import time
from datetime import datetime
from typing import Optional, Tuple
def parse_closing_time(closing_time_str: Optional[str]) -> Optional[int]:
"""Parse closing time string to unix timestamp"""
if not closing_time_str:
return None
try:
# Try various date formats
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%d %H:%M',
'%d-%m-%Y %H:%M',
]
for fmt in formats:
try:
dt = datetime.strptime(closing_time_str, fmt)
return int(dt.timestamp())
except:
continue
return None
except:
return None
def calculate_ttl(closing_timestamp: int, current_time: Optional[int] = None) -> int:
"""
Calculate Time-To-Live (TTL) for cache based on time until closing
Strategy:
- Closing in > 7 days: Scrape once per day (TTL = 24 hours)
- Closing in 3-7 days: Scrape every 12 hours
- Closing in 1-3 days: Scrape every 6 hours
- Closing in 12-24 hours: Scrape every 3 hours
- Closing in 6-12 hours: Scrape every 2 hours
- Closing in 1-6 hours: Scrape every 30 minutes
- Closing in < 1 hour: Scrape every 10 minutes
- Already closed: TTL = infinite (no need to rescrape)
"""
if current_time is None:
current_time = int(time.time())
time_until_close = closing_timestamp - current_time
# Already closed - very low priority
if time_until_close <= 0:
return 999999999 # Effectively infinite TTL
# Convert to hours
hours_until_close = time_until_close / 3600
if hours_until_close > 168: # > 7 days
return 24 * 3600 # 24 hours
elif hours_until_close > 72: # 3-7 days
return 12 * 3600 # 12 hours
elif hours_until_close > 24: # 1-3 days
return 6 * 3600 # 6 hours
elif hours_until_close > 12: # 12-24 hours
return 3 * 3600 # 3 hours
elif hours_until_close > 6: # 6-12 hours
return 2 * 3600 # 2 hours
elif hours_until_close > 1: # 1-6 hours
return 30 * 60 # 30 minutes
else: # < 1 hour - URGENT!
return 10 * 60 # 10 minutes
def calculate_priority(
closing_time_str: Optional[str],
scraped_at: Optional[int],
current_time: Optional[int] = None
) -> Tuple[int, int]:
"""
Calculate scrape priority and next_scrape_at timestamp
Returns:
(priority, next_scrape_at)
Priority Scale:
10000+ = Never scraped (highest priority)
9000+ = Closing within 1 hour
8000+ = Closing within 6 hours
7000+ = Closing within 24 hours
6000+ = Closing within 3 days
5000+ = Closing within 7 days
1000+ = Due for re-scrape (TTL expired)
0-999 = Recently scraped, not due yet
-1000 = Already closed
"""
if current_time is None:
current_time = int(time.time())
# Never scraped = highest priority
if scraped_at is None or scraped_at == 0:
closing_timestamp = parse_closing_time(closing_time_str)
if closing_timestamp:
ttl = calculate_ttl(closing_timestamp, current_time)
next_scrape = current_time # Scrape immediately
time_until_close = closing_timestamp - current_time
# Boost priority based on urgency
if time_until_close <= 0:
return (10000, next_scrape) # Closed but never scraped
elif time_until_close < 3600:
return (19000, next_scrape) # < 1 hour - CRITICAL
elif time_until_close < 6 * 3600:
return (18000, next_scrape) # < 6 hours
elif time_until_close < 24 * 3600:
return (17000, next_scrape) # < 24 hours
elif time_until_close < 3 * 24 * 3600:
return (16000, next_scrape) # < 3 days
else:
return (15000, next_scrape) # > 3 days but never scraped
else:
return (15000, current_time) # No closing time, high priority anyway
# Already scraped - calculate based on TTL
closing_timestamp = parse_closing_time(closing_time_str)
if not closing_timestamp:
# No closing time - scrape once per day
ttl = 24 * 3600
next_scrape = scraped_at + ttl
time_until_rescrape = next_scrape - current_time
if time_until_rescrape <= 0:
return (1000, current_time) # Due for rescrape
else:
return (500, next_scrape) # Not due yet
# Has closing time - intelligent TTL
time_until_close = closing_timestamp - current_time
# Already closed
if time_until_close <= 0:
return (-1000, 999999999) # Very low priority, never rescrape
# Calculate TTL and next scrape time
ttl = calculate_ttl(closing_timestamp, current_time)
next_scrape = scraped_at + ttl
time_until_rescrape = next_scrape - current_time
# Priority based on urgency and TTL
if time_until_rescrape <= 0:
# Due for rescrape - urgency-based priority
if time_until_close < 3600:
return (9000, current_time) # < 1 hour - URGENT
elif time_until_close < 6 * 3600:
return (8000, current_time) # < 6 hours
elif time_until_close < 24 * 3600:
return (7000, current_time) # < 24 hours
elif time_until_close < 3 * 24 * 3600:
return (6000, current_time) # < 3 days
elif time_until_close < 7 * 24 * 3600:
return (5000, current_time) # < 7 days
else:
return (1000, current_time) # > 7 days, but due
else:
# Not due yet - low priority
return (min(999, int(time_until_close / 3600)), next_scrape)

View File

@@ -10,13 +10,13 @@ import random
import json
import re
from pathlib import Path
from typing import Dict, List, Optional, Set
from typing import Dict, List, Optional, Set, Tuple
from urllib.parse import urljoin
from playwright.async_api import async_playwright, Page
from config import (
BASE_URL, RATE_LIMIT_SECONDS, MAX_PAGES, DOWNLOAD_IMAGES, IMAGES_DIR
BASE_URL, RATE_LIMIT_SECONDS, MAX_PAGES, DOWNLOAD_IMAGES, IMAGES_DIR, OFFLINE
)
from cache import CacheManager
from parse import DataParser
@@ -27,6 +27,7 @@ from graphql_client import (
extract_enriched_attributes
)
from bid_history_client import fetch_bid_history, parse_bid_history
from priority import calculate_priority, parse_closing_time
class TroostwijkScraper:
"""Main scraper class for Troostwijk Auctions"""
@@ -38,6 +39,8 @@ class TroostwijkScraper:
self.visited_lots: Set[str] = set()
self.last_request_time = 0
self.download_images = DOWNLOAD_IMAGES
self.intercepted_api_data: Dict[str, str] = {} # Store intercepted GraphQL responses by lot_id
self.offline = OFFLINE
async def _download_image(self, session: 'aiohttp.ClientSession', url: str, lot_id: str, index: int) -> Optional[str]:
"""Download an image and save it locally (without rate limiting - concurrent within lot)"""
@@ -102,6 +105,11 @@ class TroostwijkScraper:
print(f" CACHE HIT: {url} ({cache_time:.0f}ms)")
return {'content': cached['content'], 'from_cache': True}
# In OFFLINE mode we never fetch from network
if self.offline:
print(f" OFFLINE: cache miss for {url} — skipping fetch")
return None
await self._rate_limit()
try:
@@ -205,6 +213,73 @@ class TroostwijkScraper:
result = await self._get_page(page, url)
if not result:
# OFFLINE fallback: try to construct page data directly from DB
if self.offline:
import sqlite3
conn = sqlite3.connect(self.cache.db_path)
cur = conn.cursor()
# Try lot first
cur.execute("SELECT * FROM lots WHERE url = ?", (url,))
lot_row = cur.fetchone()
if lot_row:
# Build a dict using column names
col_names = [d[0] for d in cur.description]
lot_dict = dict(zip(col_names, lot_row))
conn.close()
page_data = {
'type': 'lot',
'lot_id': lot_dict.get('lot_id'),
'auction_id': lot_dict.get('auction_id'),
'url': lot_dict.get('url') or url,
'title': lot_dict.get('title') or '',
'current_bid': lot_dict.get('current_bid') or '',
'bid_count': lot_dict.get('bid_count') or 0,
'closing_time': lot_dict.get('closing_time') or '',
'viewing_time': lot_dict.get('viewing_time') or '',
'pickup_date': lot_dict.get('pickup_date') or '',
'location': lot_dict.get('location') or '',
'description': lot_dict.get('description') or '',
'category': lot_dict.get('category') or '',
'status': lot_dict.get('status') or '',
'brand': lot_dict.get('brand') or '',
'model': lot_dict.get('model') or '',
'attributes_json': lot_dict.get('attributes_json') or '',
'first_bid_time': lot_dict.get('first_bid_time'),
'last_bid_time': lot_dict.get('last_bid_time'),
'bid_velocity': lot_dict.get('bid_velocity'),
'followers_count': lot_dict.get('followers_count') or 0,
'estimated_min_price': lot_dict.get('estimated_min_price'),
'estimated_max_price': lot_dict.get('estimated_max_price'),
'lot_condition': lot_dict.get('lot_condition') or '',
'appearance': lot_dict.get('appearance') or '',
'scraped_at': lot_dict.get('scraped_at') or '',
}
print(" OFFLINE: using DB record for lot")
self.visited_lots.add(url)
return page_data
# Try auction by URL
cur.execute("SELECT * FROM auctions WHERE url = ?", (url,))
auc_row = cur.fetchone()
if auc_row:
col_names = [d[0] for d in cur.description]
auc_dict = dict(zip(col_names, auc_row))
conn.close()
page_data = {
'type': 'auction',
'auction_id': auc_dict.get('auction_id'),
'url': auc_dict.get('url') or url,
'title': auc_dict.get('title') or '',
'location': auc_dict.get('location') or '',
'lots_count': auc_dict.get('lots_count') or 0,
'first_lot_closing_time': auc_dict.get('first_lot_closing_time') or '',
'scraped_at': auc_dict.get('scraped_at') or '',
}
print(" OFFLINE: using DB record for auction")
self.visited_lots.add(url)
return page_data
conn.close()
return None
content = result['content']
@@ -251,29 +326,59 @@ class TroostwijkScraper:
except:
pass
# Fetch all API data concurrently (or use cache if HTML was cached)
# Fetch all API data concurrently (or use intercepted/cached data)
lot_id = page_data.get('lot_id')
auction_id = page_data.get('auction_id')
import sqlite3
if from_cache:
# Step 1: Check if we intercepted API data during page load
intercepted_data = None
if lot_id in self.intercepted_api_data:
print(f" Using intercepted API data (free!)")
try:
intercepted_json = self.intercepted_api_data[lot_id]
intercepted_data = json.loads(intercepted_json)
# Store the raw JSON for future offline use
page_data['api_data_json'] = intercepted_json
# Extract lot data from intercepted response
if 'data' in intercepted_data and 'lot' in intercepted_data['data']:
lot_api_data = intercepted_data['data']['lot']
# Format it as if it came from our fetch_lot_bidding_data
bidding_data = {'lot': lot_api_data}
from_cache = False # We have fresh data
except Exception as e:
print(f" Error parsing intercepted data: {e}")
intercepted_data = None
if intercepted_data:
# We got free API data from interception - skip the fetch logic
pass
elif from_cache:
# Check if we have cached API data in database
conn = sqlite3.connect(self.cache.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT followers_count, estimated_min_price, current_bid, bid_count
SELECT followers_count, estimated_min_price, current_bid, bid_count, closing_time, status
FROM lots WHERE lot_id = ?
""", (lot_id,))
existing = cursor.fetchone()
conn.close()
# Use cached API data if available and not null
if existing and existing[0] is not None:
# Data quality check: Must have followers_count AND closing_time to be considered "complete"
# This prevents using stale records like old "0 bids" entries
is_complete = (existing and
existing[0] is not None and # followers_count exists
existing[4] is not None and # closing_time exists
existing[4] != '') # closing_time is not empty
if is_complete:
print(f" Using cached API data")
page_data['followers_count'] = existing[0]
page_data['estimated_min_price'] = existing[1]
page_data['current_bid'] = existing[2] or page_data.get('current_bid', 'No bids')
page_data['bid_count'] = existing[3] or 0
page_data['closing_time'] = existing[4] # Add closing_time
page_data['status'] = existing[5] or '' # Add status
bidding_data = None
bid_history_data = None
else:
@@ -287,7 +392,8 @@ class TroostwijkScraper:
bid_history_data = None # Will fetch after we have lot_uuid
else:
# Fresh page fetch - make concurrent API calls for all data
print(f" Fetching lot data from API (concurrent)...")
if not self.offline:
print(f" Fetching lot data from API (concurrent)...")
api_tasks = [fetch_lot_bidding_data(lot_id)]
task_map = {'bidding': 0} # Track which index corresponds to which task
@@ -315,6 +421,10 @@ class TroostwijkScraper:
results = await asyncio.gather(*api_tasks, return_exceptions=True)
bidding_data = results[task_map['bidding']] if results and not isinstance(results[task_map['bidding']], Exception) else None
# Store raw API JSON for offline replay
if bidding_data:
page_data['api_data_json'] = json.dumps(bidding_data)
# Process auction data if it was fetched
if 'auction' in task_map and len(results) > task_map['auction']:
auction_data = results[task_map['auction']]
@@ -331,7 +441,19 @@ class TroostwijkScraper:
if bidding_data:
formatted_data = format_bid_data(bidding_data)
page_data.update(formatted_data)
# Merge data intelligently - don't overwrite existing fields
# Parser (from __NEXT_DATA__) has: description, category, images
# API has: current_bid, bid_count, closing_time, status, followers, estimates
# Keep parser data, enhance with API data
for key, value in formatted_data.items():
# Only update if current value is missing/empty
current_value = page_data.get(key)
if current_value is None or current_value == '' or current_value == 0 or current_value == 'No bids':
page_data[key] = value
# Special case: always update bid_count if API has higher value
elif key == 'bid_count' and isinstance(value, int) and value > current_value:
page_data[key] = value
# Enhanced logging with new intelligence fields
print(f" Bid: {page_data.get('current_bid', 'N/A')}")
@@ -428,6 +550,17 @@ class TroostwijkScraper:
print(f" Bid: {page_data.get('current_bid', 'N/A')} (from HTML)")
print(f" Location: {page_data.get('location', 'N/A')}")
# Calculate and store priority for next scrape
current_time = int(time.time())
priority, next_scrape = calculate_priority(
page_data.get('closing_time'),
current_time, # Just scraped now
current_time
)
page_data['scrape_priority'] = priority
page_data['next_scrape_at'] = next_scrape
self.cache.save_lot(page_data)
images = page_data.get('images', [])
@@ -468,8 +601,112 @@ class TroostwijkScraper:
return page_data
def _prioritize_lots(self, lot_urls: List[str]) -> List[Tuple[int, str, str]]:
"""
Prioritize lots based on closing time and scrape history
Returns list of (priority, url, description) tuples sorted by priority (highest first)
"""
import sqlite3
prioritized = []
current_time = int(time.time())
conn = sqlite3.connect(self.cache.db_path)
cursor = conn.cursor()
for url in lot_urls:
# Extract lot_id from URL
lot_id = self.parser.extract_lot_id(url)
# Try to get existing data from database
cursor.execute("""
SELECT closing_time, scraped_at, scrape_priority, next_scrape_at
FROM lots WHERE lot_id = ? OR url = ?
""", (lot_id, url))
row = cursor.fetchone()
if row:
closing_time, scraped_at, existing_priority, next_scrape_at = row
# Parse scraped_at (it might be a string timestamp)
if isinstance(scraped_at, str):
try:
scraped_at = int(datetime.strptime(scraped_at, '%Y-%m-%d %H:%M:%S').timestamp())
except:
scraped_at = None
else:
closing_time = None
scraped_at = None
# Calculate priority
priority, next_scrape = calculate_priority(closing_time, scraped_at, current_time)
# Create description
if scraped_at is None:
desc = "Never scraped"
elif priority >= 15000:
desc = "Never scraped (high urgency)"
elif priority >= 9000:
desc = "URGENT: <1hr to close"
elif priority >= 8000:
desc = "High: <6hr to close"
elif priority >= 7000:
desc = "Medium: <24hr to close"
elif priority >= 5000:
desc = "Normal: <7d to close"
elif priority >= 1000:
desc = "Due for rescrape"
elif priority < 0:
desc = "Already closed"
else:
desc = f"Recently scraped"
prioritized.append((priority, url, desc))
conn.close()
# Sort by priority (highest first)
prioritized.sort(key=lambda x: x[0], reverse=True)
return prioritized
async def crawl_auctions(self, max_pages: int = MAX_PAGES) -> List[Dict]:
"""Main crawl function"""
if self.offline:
print("Launching OFFLINE crawl (no network requests)")
# Gather URLs from database
import sqlite3
conn = sqlite3.connect(self.cache.db_path)
cur = conn.cursor()
cur.execute("SELECT DISTINCT url FROM auctions")
auction_urls = [r[0] for r in cur.fetchall() if r and r[0]]
cur.execute("SELECT DISTINCT url FROM lots")
lot_urls = [r[0] for r in cur.fetchall() if r and r[0]]
conn.close()
print(f" OFFLINE: {len(auction_urls)} auctions and {len(lot_urls)} lots in DB")
results: List[Dict] = []
# Optionally process auctions (parse cached HTML if exists or DB fallback)
for i, auc_url in enumerate(auction_urls):
print(f"\n[AUC {i+1:>3}/{len(auction_urls)}] ", end="")
page_data = await self.crawl_page(page=None, url=auc_url)
if page_data:
results.append(page_data)
print("\n" + "="*60)
print("PHASE OFFLINE: PROCESSING LOT PAGES FROM DB/CACHE")
print("="*60)
for i, lot_url in enumerate(lot_urls):
print(f"\n[LOT {i+1:>3}/{len(lot_urls)}] ", end="")
page_data = await self.crawl_page(page=None, url=lot_url)
if page_data:
results.append(page_data)
return results
async with async_playwright() as p:
print("Launching browser...")
browser = await p.chromium.launch(
@@ -491,6 +728,94 @@ class TroostwijkScraper:
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
})
# Set up COMPREHENSIVE resource interception (cache EVERYTHING)
resource_stats = {'cached': 0, 'fetched': 0, 'failed': 0}
request_bodies = {} # Store POST request bodies by URL for cache key generation
async def handle_request(request):
"""Intercept requests to capture POST bodies for GraphQL"""
try:
if request.method == 'POST' and 'graphql' in request.url:
# Store the POST body
post_data = request.post_data
if post_data:
# Create hash of POST body for cache key
import hashlib
body_hash = hashlib.md5(post_data.encode() if isinstance(post_data, str) else post_data).hexdigest()[:16]
cache_key = f"{request.url}#{body_hash}"
request_bodies[request.url] = (cache_key, post_data)
except:
pass
page.on('request', handle_request)
async def handle_response(response):
"""Intercept ALL resources and cache them"""
try:
url = response.url
status = response.status
# Get content type
headers = await response.all_headers()
content_type = headers.get('content-type', '').split(';')[0].strip()
# Determine if we should cache this resource
cacheable_types = [
'text/html', 'text/css', 'text/javascript', 'application/javascript',
'application/json', 'application/x-javascript', 'image/', 'font/',
'application/font', 'video/', 'audio/', 'application/xml', 'text/xml',
'image/svg+xml'
]
should_cache = any(content_type.startswith(ct) for ct in cacheable_types)
if should_cache and status == 200:
try:
body = await response.body()
# Determine cache key (use composite key for GraphQL POST requests)
cache_key = None
if 'graphql' in url and url in request_bodies:
cache_key, post_data = request_bodies[url]
# Save to resource cache
self.cache.save_resource(
url=url,
content=body,
content_type=content_type,
status_code=status,
headers=headers,
cache_key=cache_key
)
resource_stats['cached'] += 1
# Special handling for GraphQL responses
if 'graphql' in url and 'application/json' in content_type:
try:
body_text = body.decode('utf-8')
data = json.loads(body_text)
# Check if this is a lot details query
if 'data' in data and 'lot' in data.get('data', {}):
lot_data = data['data']['lot']
lot_slug = lot_data.get('urlSlug', '')
if lot_slug:
self.intercepted_api_data[lot_slug] = body_text
print(f" >> Intercepted GraphQL for: {lot_slug}")
except:
pass
except Exception as e:
resource_stats['failed'] += 1
else:
resource_stats['fetched'] += 1
except Exception as e:
# Silent fail - interception is opportunistic
pass
page.on('response', handle_response)
all_auction_urls = []
all_lot_urls = []
@@ -542,19 +867,39 @@ class TroostwijkScraper:
print(f"PHASE 2 COMPLETE: {len(all_lot_urls)} UNIQUE LOTS")
print(f"{'='*60}")
# Phase 3: Scrape each lot page
# Phase 2.5: Sort lots by priority (closing time + TTL)
print("\n" + "="*60)
print("PHASE 3: SCRAPING INDIVIDUAL LOT PAGES")
print("PHASE 2.5: CALCULATING SCRAPE PRIORITIES")
print("="*60)
sorted_lots = self._prioritize_lots(all_lot_urls)
print(f" > Sorted {len(sorted_lots)} lots by priority")
print(f" > Highest priority: {sorted_lots[0][2] if sorted_lots else 'N/A'}")
print(f" > Lowest priority: {sorted_lots[-1][2] if sorted_lots else 'N/A'}")
# Phase 3: Scrape each lot page (in priority order)
print("\n" + "="*60)
print("PHASE 3: SCRAPING LOTS (PRIORITY ORDER)")
print("="*60)
results = []
for i, lot_url in enumerate(all_lot_urls):
print(f"\n[{i+1:>3}/{len(all_lot_urls)}] ", end="")
for i, (priority, lot_url, priority_desc) in enumerate(sorted_lots):
print(f"\n[{i+1:>3}/{len(sorted_lots)}] [P:{priority}] ", end="")
page_data = await self.crawl_page(page, lot_url)
if page_data:
results.append(page_data)
await browser.close()
# Print resource caching statistics
print(f"\n{'='*60}")
print(f"RESOURCE CACHE STATISTICS")
print(f"{'='*60}")
print(f" Cached: {resource_stats['cached']} resources")
print(f" Fetched (not cached): {resource_stats['fetched']}")
print(f" Failed: {resource_stats['failed']}")
print(f"{'='*60}")
return results
def export_to_files(self) -> Dict[str, str]:

303
test/test_cache_behavior.py Normal file
View File

@@ -0,0 +1,303 @@
#!/usr/bin/env python3
"""
Test cache behavior - verify page is only fetched once and data persists offline
"""
import sys
import os
import asyncio
import sqlite3
import time
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
from cache import CacheManager
from scraper import TroostwijkScraper
import config
class TestCacheBehavior:
"""Test suite for cache and offline functionality"""
def __init__(self):
self.test_db = "test_cache.db"
self.original_db = config.CACHE_DB
self.cache = None
self.scraper = None
def setup(self):
"""Setup test environment"""
print("\n" + "="*60)
print("TEST SETUP")
print("="*60)
# Use test database
config.CACHE_DB = self.test_db
# Ensure offline mode is disabled for tests
config.OFFLINE = False
# Clean up old test database
if os.path.exists(self.test_db):
os.remove(self.test_db)
print(f" * Removed old test database")
# Initialize cache and scraper
self.cache = CacheManager()
self.scraper = TroostwijkScraper()
self.scraper.offline = False # Explicitly disable offline mode
print(f" * Created test database: {self.test_db}")
print(f" * Initialized cache and scraper")
print(f" * Offline mode: DISABLED")
def teardown(self):
"""Cleanup test environment"""
print("\n" + "="*60)
print("TEST TEARDOWN")
print("="*60)
# Restore original database path
config.CACHE_DB = self.original_db
# Keep test database for inspection
print(f" * Test database preserved: {self.test_db}")
print(f" * Restored original database path")
async def test_page_fetched_once(self):
"""Test that a page is only fetched from network once"""
print("\n" + "="*60)
print("TEST 1: Page Fetched Only Once")
print("="*60)
# Pick a real lot URL to test with
test_url = "https://www.troostwijkauctions.com/l/bmw-x5-xdrive40d-high-executive-m-sport-a8-286pk-2019-A1-26955-7"
print(f"\nTest URL: {test_url}")
# First visit - should fetch from network
print("\n--- FIRST VISIT (should fetch from network) ---")
start_time = time.time()
async with asyncio.timeout(60): # 60 second timeout
page_data_1 = await self._scrape_single_page(test_url)
first_visit_time = time.time() - start_time
if not page_data_1:
print(" [FAIL] First visit returned no data")
return False
print(f" [OK] First visit completed in {first_visit_time:.2f}s")
print(f" [OK] Got lot data: {page_data_1.get('title', 'N/A')[:60]}...")
# Check closing time was captured
closing_time_1 = page_data_1.get('closing_time')
print(f" [OK] Closing time: {closing_time_1}")
# Second visit - should use cache
print("\n--- SECOND VISIT (should use cache) ---")
start_time = time.time()
async with asyncio.timeout(30): # Should be much faster
page_data_2 = await self._scrape_single_page(test_url)
second_visit_time = time.time() - start_time
if not page_data_2:
print(" [FAIL] Second visit returned no data")
return False
print(f" [OK] Second visit completed in {second_visit_time:.2f}s")
# Verify data matches
if page_data_1.get('lot_id') != page_data_2.get('lot_id'):
print(f" [FAIL] Lot IDs don't match")
return False
closing_time_2 = page_data_2.get('closing_time')
print(f" [OK] Closing time: {closing_time_2}")
if closing_time_1 != closing_time_2:
print(f" [FAIL] Closing times don't match!")
print(f" First: {closing_time_1}")
print(f" Second: {closing_time_2}")
return False
# Verify second visit was significantly faster (used cache)
if second_visit_time >= first_visit_time * 0.5:
print(f" [WARN] Second visit not significantly faster")
print(f" First: {first_visit_time:.2f}s")
print(f" Second: {second_visit_time:.2f}s")
else:
print(f" [OK] Second visit was {(first_visit_time / second_visit_time):.1f}x faster (cache working!)")
# Verify resource cache has entries
conn = sqlite3.connect(self.test_db)
cursor = conn.execute("SELECT COUNT(*) FROM resource_cache")
resource_count = cursor.fetchone()[0]
conn.close()
print(f" [OK] Cached {resource_count} resources")
print("\n[PASS] TEST 1 PASSED: Page fetched only once, data persists")
return True
async def test_offline_mode(self):
"""Test that offline mode works with cached data"""
print("\n" + "="*60)
print("TEST 2: Offline Mode with Cached Data")
print("="*60)
# Use the same URL from test 1 (should be cached)
test_url = "https://www.troostwijkauctions.com/l/bmw-x5-xdrive40d-high-executive-m-sport-a8-286pk-2019-A1-26955-7"
# Enable offline mode
original_offline = config.OFFLINE
config.OFFLINE = True
self.scraper.offline = True
print(f"\nTest URL: {test_url}")
print(" * Offline mode: ENABLED")
try:
# Try to scrape in offline mode
print("\n--- OFFLINE SCRAPE (should use DB/cache only) ---")
start_time = time.time()
async with asyncio.timeout(30):
page_data = await self._scrape_single_page(test_url)
offline_time = time.time() - start_time
if not page_data:
print(" [FAIL] Offline mode returned no data")
return False
print(f" [OK] Offline scrape completed in {offline_time:.2f}s")
print(f" [OK] Got lot data: {page_data.get('title', 'N/A')[:60]}...")
# Check closing time is available
closing_time = page_data.get('closing_time')
if not closing_time:
print(f" [FAIL] No closing time in offline mode")
return False
print(f" [OK] Closing time preserved: {closing_time}")
# Verify essential fields are present
essential_fields = ['lot_id', 'title', 'url', 'location']
missing_fields = [f for f in essential_fields if not page_data.get(f)]
if missing_fields:
print(f" [FAIL] Missing essential fields: {missing_fields}")
return False
print(f" [OK] All essential fields present")
# Check database has the lot
conn = sqlite3.connect(self.test_db)
cursor = conn.execute("SELECT closing_time FROM lots WHERE url = ?", (test_url,))
row = cursor.fetchone()
conn.close()
if not row:
print(f" [FAIL] Lot not found in database")
return False
db_closing_time = row[0]
print(f" [OK] Database has closing time: {db_closing_time}")
if db_closing_time != closing_time:
print(f" [FAIL] Closing time mismatch")
print(f" Scraped: {closing_time}")
print(f" Database: {db_closing_time}")
return False
print("\n[PASS] TEST 2 PASSED: Offline mode works, closing time preserved")
return True
finally:
# Restore offline mode
config.OFFLINE = original_offline
self.scraper.offline = original_offline
async def _scrape_single_page(self, url):
"""Helper to scrape a single page"""
from playwright.async_api import async_playwright
if config.OFFLINE or self.scraper.offline:
# Offline mode - use crawl_page directly
return await self.scraper.crawl_page(page=None, url=url)
# Online mode - need browser
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
try:
result = await self.scraper.crawl_page(page, url)
return result
finally:
await browser.close()
async def run_all_tests(self):
"""Run all tests"""
print("\n" + "="*70)
print("CACHE BEHAVIOR TEST SUITE")
print("="*70)
self.setup()
results = []
try:
# Test 1: Page fetched once
result1 = await self.test_page_fetched_once()
results.append(("Page Fetched Once", result1))
# Test 2: Offline mode
result2 = await self.test_offline_mode()
results.append(("Offline Mode", result2))
except Exception as e:
print(f"\n[ERROR] TEST SUITE ERROR: {e}")
import traceback
traceback.print_exc()
finally:
self.teardown()
# Print summary
print("\n" + "="*70)
print("TEST SUMMARY")
print("="*70)
all_passed = True
for test_name, passed in results:
status = "[PASS]" if passed else "[FAIL]"
print(f" {status}: {test_name}")
if not passed:
all_passed = False
print("="*70)
if all_passed:
print("\n*** ALL TESTS PASSED! ***")
return 0
else:
print("\n*** SOME TESTS FAILED ***")
return 1
async def main():
"""Run tests"""
tester = TestCacheBehavior()
exit_code = await tester.run_all_tests()
sys.exit(exit_code)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
import sys
import os
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, parent_dir)
sys.path.insert(0, os.path.join(parent_dir, 'src'))
import asyncio
from scraper import TroostwijkScraper
import config
import os
async def test():
# Force online mode
os.environ['SCAEV_OFFLINE'] = '0'
config.OFFLINE = False
scraper = TroostwijkScraper()
scraper.offline = False
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
url = "https://www.troostwijkauctions.com/l/used-dometic-seastar-tfxchx8641p-top-mount-engine-control-liver-A1-39684-12"
# Add debug logging to parser
original_parse = scraper.parser.parse_page
def debug_parse(content, url):
result = original_parse(content, url)
if result:
print(f"PARSER OUTPUT:")
print(f" description: {result.get('description', 'NONE')[:100] if result.get('description') else 'EMPTY'}")
print(f" closing_time: {result.get('closing_time', 'NONE')}")
print(f" bid_count: {result.get('bid_count', 'NONE')}")
return result
scraper.parser.parse_page = debug_parse
page_data = await scraper.crawl_page(page, url)
await browser.close()
print(f"\nFINAL page_data:")
print(f" description: {page_data.get('description', 'NONE')[:100] if page_data and page_data.get('description') else 'EMPTY'}")
print(f" closing_time: {page_data.get('closing_time', 'NONE') if page_data else 'NONE'}")
print(f" bid_count: {page_data.get('bid_count', 'NONE') if page_data else 'NONE'}")
print(f" status: {page_data.get('status', 'NONE') if page_data else 'NONE'}")
asyncio.run(test())

208
test/test_missing_fields.py Normal file
View File

@@ -0,0 +1,208 @@
#!/usr/bin/env python3
"""
Test to validate that all expected fields are populated after scraping
"""
import sys
import os
import asyncio
import sqlite3
# Add parent and src directory to path
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, parent_dir)
sys.path.insert(0, os.path.join(parent_dir, 'src'))
# Force online mode before importing
os.environ['SCAEV_OFFLINE'] = '0'
from scraper import TroostwijkScraper
import config
async def test_lot_has_all_fields():
"""Test that a lot page has all expected fields populated"""
print("\n" + "="*60)
print("TEST: Lot has all required fields")
print("="*60)
# Use the example lot from user
test_url = "https://www.troostwijkauctions.com/l/radaway-idea-black-dwj-doucheopstelling-A1-39956-18"
# Ensure we're not in offline mode
config.OFFLINE = False
scraper = TroostwijkScraper()
scraper.offline = False
print(f"\n[1] Scraping: {test_url}")
# Start playwright and scrape
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
page_data = await scraper.crawl_page(page, test_url)
await browser.close()
if not page_data:
print(" [FAIL] No data returned")
return False
print(f"\n[2] Validating fields...")
# Fields that MUST have values (critical for auction functionality)
required_fields = {
'closing_time': 'Closing time',
'current_bid': 'Current bid',
'bid_count': 'Bid count',
'status': 'Status',
}
# Fields that SHOULD have values but may legitimately be empty
optional_fields = {
'description': 'Description',
}
missing_fields = []
empty_fields = []
optional_missing = []
# Check required fields
for field, label in required_fields.items():
value = page_data.get(field)
if value is None:
missing_fields.append(label)
print(f" [FAIL] {label}: MISSING (None)")
elif value == '' or value == 0 or value == 'No bids':
# Special case: 'No bids' is only acceptable if bid_count is 0
if field == 'current_bid' and page_data.get('bid_count', 0) == 0:
print(f" [PASS] {label}: '{value}' (acceptable - no bids)")
else:
empty_fields.append(label)
print(f" [FAIL] {label}: EMPTY ('{value}')")
else:
print(f" [PASS] {label}: {value}")
# Check optional fields (warn but don't fail)
for field, label in optional_fields.items():
value = page_data.get(field)
if value is None or value == '':
optional_missing.append(label)
print(f" [WARN] {label}: EMPTY (may be legitimate)")
else:
print(f" [PASS] {label}: {value[:50]}...")
# Check database
print(f"\n[3] Checking database entry...")
conn = sqlite3.connect(scraper.cache.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT closing_time, current_bid, bid_count, description, status
FROM lots WHERE url = ?
""", (test_url,))
row = cursor.fetchone()
conn.close()
if row:
db_closing, db_bid, db_count, db_desc, db_status = row
print(f" DB closing_time: {db_closing or 'EMPTY'}")
print(f" DB current_bid: {db_bid or 'EMPTY'}")
print(f" DB bid_count: {db_count}")
print(f" DB description: {db_desc[:50] if db_desc else 'EMPTY'}...")
print(f" DB status: {db_status or 'EMPTY'}")
# Verify DB matches page_data
if db_closing != page_data.get('closing_time'):
print(f" [WARN] DB closing_time doesn't match page_data")
if db_count != page_data.get('bid_count'):
print(f" [WARN] DB bid_count doesn't match page_data")
else:
print(f" [WARN] No database entry found")
print(f"\n" + "="*60)
if missing_fields or empty_fields:
print(f"[FAIL] Missing fields: {', '.join(missing_fields)}")
print(f"[FAIL] Empty fields: {', '.join(empty_fields)}")
if optional_missing:
print(f"[WARN] Optional missing: {', '.join(optional_missing)}")
return False
else:
print("[PASS] All required fields are populated")
if optional_missing:
print(f"[WARN] Optional missing: {', '.join(optional_missing)}")
return True
async def test_lot_with_description():
"""Test that a lot with description preserves it"""
print("\n" + "="*60)
print("TEST: Lot with description")
print("="*60)
# Use a lot known to have description
test_url = "https://www.troostwijkauctions.com/l/used-dometic-seastar-tfxchx8641p-top-mount-engine-control-liver-A1-39684-12"
config.OFFLINE = False
scraper = TroostwijkScraper()
scraper.offline = False
print(f"\n[1] Scraping: {test_url}")
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
page_data = await scraper.crawl_page(page, test_url)
await browser.close()
if not page_data:
print(" [FAIL] No data returned")
return False
print(f"\n[2] Checking description...")
description = page_data.get('description', '')
if not description or description == '':
print(f" [FAIL] Description is empty")
return False
else:
print(f" [PASS] Description: {description[:100]}...")
return True
async def main():
"""Run all tests"""
print("\n" + "="*60)
print("MISSING FIELDS TEST SUITE")
print("="*60)
test1 = await test_lot_has_all_fields()
test2 = await test_lot_with_description()
print("\n" + "="*60)
if test1 and test2:
print("ALL TESTS PASSED")
else:
print("SOME TESTS FAILED")
if not test1:
print(" - test_lot_has_all_fields FAILED")
if not test2:
print(" - test_lot_with_description FAILED")
print("="*60 + "\n")
return 0 if (test1 and test2) else 1
if __name__ == '__main__':
exit_code = asyncio.run(main())
sys.exit(exit_code)