This commit is contained in:
Tour
2025-12-09 08:04:16 +01:00
commit e69563d4d6
37 changed files with 7262 additions and 0 deletions

303
test/test_cache_behavior.py Normal file
View File

@@ -0,0 +1,303 @@
#!/usr/bin/env python3
"""
Test cache behavior - verify page is only fetched once and data persists offline
"""
import sys
import os
import asyncio
import sqlite3
import time
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
from cache import CacheManager
from scraper import TroostwijkScraper
import config
class TestCacheBehavior:
"""Test suite for cache and offline functionality"""
def __init__(self):
self.test_db = "test_cache.db"
self.original_db = config.CACHE_DB
self.cache = None
self.scraper = None
def setup(self):
"""Setup test environment"""
print("\n" + "="*60)
print("TEST SETUP")
print("="*60)
# Use test database
config.CACHE_DB = self.test_db
# Ensure offline mode is disabled for tests
config.OFFLINE = False
# Clean up old test database
if os.path.exists(self.test_db):
os.remove(self.test_db)
print(f" * Removed old test database")
# Initialize cache and scraper
self.cache = CacheManager()
self.scraper = TroostwijkScraper()
self.scraper.offline = False # Explicitly disable offline mode
print(f" * Created test database: {self.test_db}")
print(f" * Initialized cache and scraper")
print(f" * Offline mode: DISABLED")
def teardown(self):
"""Cleanup test environment"""
print("\n" + "="*60)
print("TEST TEARDOWN")
print("="*60)
# Restore original database path
config.CACHE_DB = self.original_db
# Keep test database for inspection
print(f" * Test database preserved: {self.test_db}")
print(f" * Restored original database path")
async def test_page_fetched_once(self):
"""Test that a page is only fetched from network once"""
print("\n" + "="*60)
print("TEST 1: Page Fetched Only Once")
print("="*60)
# Pick a real lot URL to test with
test_url = "https://www.troostwijkauctions.com/l/bmw-x5-xdrive40d-high-executive-m-sport-a8-286pk-2019-A1-26955-7"
print(f"\nTest URL: {test_url}")
# First visit - should fetch from network
print("\n--- FIRST VISIT (should fetch from network) ---")
start_time = time.time()
async with asyncio.timeout(60): # 60 second timeout
page_data_1 = await self._scrape_single_page(test_url)
first_visit_time = time.time() - start_time
if not page_data_1:
print(" [FAIL] First visit returned no data")
return False
print(f" [OK] First visit completed in {first_visit_time:.2f}s")
print(f" [OK] Got lot data: {page_data_1.get('title', 'N/A')[:60]}...")
# Check closing time was captured
closing_time_1 = page_data_1.get('closing_time')
print(f" [OK] Closing time: {closing_time_1}")
# Second visit - should use cache
print("\n--- SECOND VISIT (should use cache) ---")
start_time = time.time()
async with asyncio.timeout(30): # Should be much faster
page_data_2 = await self._scrape_single_page(test_url)
second_visit_time = time.time() - start_time
if not page_data_2:
print(" [FAIL] Second visit returned no data")
return False
print(f" [OK] Second visit completed in {second_visit_time:.2f}s")
# Verify data matches
if page_data_1.get('lot_id') != page_data_2.get('lot_id'):
print(f" [FAIL] Lot IDs don't match")
return False
closing_time_2 = page_data_2.get('closing_time')
print(f" [OK] Closing time: {closing_time_2}")
if closing_time_1 != closing_time_2:
print(f" [FAIL] Closing times don't match!")
print(f" First: {closing_time_1}")
print(f" Second: {closing_time_2}")
return False
# Verify second visit was significantly faster (used cache)
if second_visit_time >= first_visit_time * 0.5:
print(f" [WARN] Second visit not significantly faster")
print(f" First: {first_visit_time:.2f}s")
print(f" Second: {second_visit_time:.2f}s")
else:
print(f" [OK] Second visit was {(first_visit_time / second_visit_time):.1f}x faster (cache working!)")
# Verify resource cache has entries
conn = sqlite3.connect(self.test_db)
cursor = conn.execute("SELECT COUNT(*) FROM resource_cache")
resource_count = cursor.fetchone()[0]
conn.close()
print(f" [OK] Cached {resource_count} resources")
print("\n[PASS] TEST 1 PASSED: Page fetched only once, data persists")
return True
async def test_offline_mode(self):
"""Test that offline mode works with cached data"""
print("\n" + "="*60)
print("TEST 2: Offline Mode with Cached Data")
print("="*60)
# Use the same URL from test 1 (should be cached)
test_url = "https://www.troostwijkauctions.com/l/bmw-x5-xdrive40d-high-executive-m-sport-a8-286pk-2019-A1-26955-7"
# Enable offline mode
original_offline = config.OFFLINE
config.OFFLINE = True
self.scraper.offline = True
print(f"\nTest URL: {test_url}")
print(" * Offline mode: ENABLED")
try:
# Try to scrape in offline mode
print("\n--- OFFLINE SCRAPE (should use DB/cache only) ---")
start_time = time.time()
async with asyncio.timeout(30):
page_data = await self._scrape_single_page(test_url)
offline_time = time.time() - start_time
if not page_data:
print(" [FAIL] Offline mode returned no data")
return False
print(f" [OK] Offline scrape completed in {offline_time:.2f}s")
print(f" [OK] Got lot data: {page_data.get('title', 'N/A')[:60]}...")
# Check closing time is available
closing_time = page_data.get('closing_time')
if not closing_time:
print(f" [FAIL] No closing time in offline mode")
return False
print(f" [OK] Closing time preserved: {closing_time}")
# Verify essential fields are present
essential_fields = ['lot_id', 'title', 'url', 'location']
missing_fields = [f for f in essential_fields if not page_data.get(f)]
if missing_fields:
print(f" [FAIL] Missing essential fields: {missing_fields}")
return False
print(f" [OK] All essential fields present")
# Check database has the lot
conn = sqlite3.connect(self.test_db)
cursor = conn.execute("SELECT closing_time FROM lots WHERE url = ?", (test_url,))
row = cursor.fetchone()
conn.close()
if not row:
print(f" [FAIL] Lot not found in database")
return False
db_closing_time = row[0]
print(f" [OK] Database has closing time: {db_closing_time}")
if db_closing_time != closing_time:
print(f" [FAIL] Closing time mismatch")
print(f" Scraped: {closing_time}")
print(f" Database: {db_closing_time}")
return False
print("\n[PASS] TEST 2 PASSED: Offline mode works, closing time preserved")
return True
finally:
# Restore offline mode
config.OFFLINE = original_offline
self.scraper.offline = original_offline
async def _scrape_single_page(self, url):
"""Helper to scrape a single page"""
from playwright.async_api import async_playwright
if config.OFFLINE or self.scraper.offline:
# Offline mode - use crawl_page directly
return await self.scraper.crawl_page(page=None, url=url)
# Online mode - need browser
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
try:
result = await self.scraper.crawl_page(page, url)
return result
finally:
await browser.close()
async def run_all_tests(self):
"""Run all tests"""
print("\n" + "="*70)
print("CACHE BEHAVIOR TEST SUITE")
print("="*70)
self.setup()
results = []
try:
# Test 1: Page fetched once
result1 = await self.test_page_fetched_once()
results.append(("Page Fetched Once", result1))
# Test 2: Offline mode
result2 = await self.test_offline_mode()
results.append(("Offline Mode", result2))
except Exception as e:
print(f"\n[ERROR] TEST SUITE ERROR: {e}")
import traceback
traceback.print_exc()
finally:
self.teardown()
# Print summary
print("\n" + "="*70)
print("TEST SUMMARY")
print("="*70)
all_passed = True
for test_name, passed in results:
status = "[PASS]" if passed else "[FAIL]"
print(f" {status}: {test_name}")
if not passed:
all_passed = False
print("="*70)
if all_passed:
print("\n*** ALL TESTS PASSED! ***")
return 0
else:
print("\n*** SOME TESTS FAILED ***")
return 1
async def main():
"""Run tests"""
tester = TestCacheBehavior()
exit_code = await tester.run_all_tests()
sys.exit(exit_code)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
import sys
import os
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, parent_dir)
sys.path.insert(0, os.path.join(parent_dir, 'src'))
import asyncio
from scraper import TroostwijkScraper
import config
import os
async def test():
# Force online mode
os.environ['SCAEV_OFFLINE'] = '0'
config.OFFLINE = False
scraper = TroostwijkScraper()
scraper.offline = False
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
url = "https://www.troostwijkauctions.com/l/used-dometic-seastar-tfxchx8641p-top-mount-engine-control-liver-A1-39684-12"
# Add debug logging to parser
original_parse = scraper.parser.parse_page
def debug_parse(content, url):
result = original_parse(content, url)
if result:
print(f"PARSER OUTPUT:")
print(f" description: {result.get('description', 'NONE')[:100] if result.get('description') else 'EMPTY'}")
print(f" closing_time: {result.get('closing_time', 'NONE')}")
print(f" bid_count: {result.get('bid_count', 'NONE')}")
return result
scraper.parser.parse_page = debug_parse
page_data = await scraper.crawl_page(page, url)
await browser.close()
print(f"\nFINAL page_data:")
print(f" description: {page_data.get('description', 'NONE')[:100] if page_data and page_data.get('description') else 'EMPTY'}")
print(f" closing_time: {page_data.get('closing_time', 'NONE') if page_data else 'NONE'}")
print(f" bid_count: {page_data.get('bid_count', 'NONE') if page_data else 'NONE'}")
print(f" status: {page_data.get('status', 'NONE') if page_data else 'NONE'}")
asyncio.run(test())

85
test/test_graphql_403.py Normal file
View File

@@ -0,0 +1,85 @@
import asyncio
import types
import sys
from pathlib import Path
import pytest
@pytest.mark.asyncio
async def test_fetch_lot_bidding_data_403(monkeypatch):
"""
Simulate a 403 from the GraphQL endpoint and verify:
- Function returns None (graceful handling)
- It attempts a retry and logs a clear 403 message
"""
# Load modules directly from src using importlib to avoid path issues
project_root = Path(__file__).resolve().parents[1]
src_path = project_root / 'src'
import importlib.util
def _load_module(name, file_path):
spec = importlib.util.spec_from_file_location(name, str(file_path))
module = importlib.util.module_from_spec(spec)
sys.modules[name] = module
spec.loader.exec_module(module) # type: ignore
return module
# Load config first because graphql_client imports it by module name
config = _load_module('config', src_path / 'config.py')
graphql_client = _load_module('graphql_client', src_path / 'graphql_client.py')
monkeypatch.setattr(config, "OFFLINE", False, raising=False)
log_messages = []
def fake_print(*args, **kwargs):
msg = " ".join(str(a) for a in args)
log_messages.append(msg)
import builtins
monkeypatch.setattr(builtins, "print", fake_print)
class MockResponse:
def __init__(self, status=403, text_body="Forbidden"):
self.status = status
self._text_body = text_body
async def json(self):
return {}
async def text(self):
return self._text_body
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
class MockSession:
def __init__(self, *args, **kwargs):
pass
def post(self, *args, **kwargs):
# Always return 403
return MockResponse(403, "Forbidden by WAF")
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
# Patch aiohttp.ClientSession to our mock
import types as _types
dummy_aiohttp = _types.SimpleNamespace()
dummy_aiohttp.ClientSession = MockSession
# Ensure that an `import aiohttp` inside the function resolves to our dummy
monkeypatch.setitem(sys.modules, 'aiohttp', dummy_aiohttp)
result = await graphql_client.fetch_lot_bidding_data("A1-40179-35")
# Should gracefully return None
assert result is None
# Should have logged a 403 at least once
assert any("GraphQL API error: 403" in m for m in log_messages)

208
test/test_missing_fields.py Normal file
View File

@@ -0,0 +1,208 @@
#!/usr/bin/env python3
"""
Test to validate that all expected fields are populated after scraping
"""
import sys
import os
import asyncio
import sqlite3
# Add parent and src directory to path
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, parent_dir)
sys.path.insert(0, os.path.join(parent_dir, 'src'))
# Force online mode before importing
os.environ['SCAEV_OFFLINE'] = '0'
from scraper import TroostwijkScraper
import config
async def test_lot_has_all_fields():
"""Test that a lot page has all expected fields populated"""
print("\n" + "="*60)
print("TEST: Lot has all required fields")
print("="*60)
# Use the example lot from user
test_url = "https://www.troostwijkauctions.com/l/radaway-idea-black-dwj-doucheopstelling-A1-39956-18"
# Ensure we're not in offline mode
config.OFFLINE = False
scraper = TroostwijkScraper()
scraper.offline = False
print(f"\n[1] Scraping: {test_url}")
# Start playwright and scrape
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
page_data = await scraper.crawl_page(page, test_url)
await browser.close()
if not page_data:
print(" [FAIL] No data returned")
return False
print(f"\n[2] Validating fields...")
# Fields that MUST have values (critical for auction functionality)
required_fields = {
'closing_time': 'Closing time',
'current_bid': 'Current bid',
'bid_count': 'Bid count',
'status': 'Status',
}
# Fields that SHOULD have values but may legitimately be empty
optional_fields = {
'description': 'Description',
}
missing_fields = []
empty_fields = []
optional_missing = []
# Check required fields
for field, label in required_fields.items():
value = page_data.get(field)
if value is None:
missing_fields.append(label)
print(f" [FAIL] {label}: MISSING (None)")
elif value == '' or value == 0 or value == 'No bids':
# Special case: 'No bids' is only acceptable if bid_count is 0
if field == 'current_bid' and page_data.get('bid_count', 0) == 0:
print(f" [PASS] {label}: '{value}' (acceptable - no bids)")
else:
empty_fields.append(label)
print(f" [FAIL] {label}: EMPTY ('{value}')")
else:
print(f" [PASS] {label}: {value}")
# Check optional fields (warn but don't fail)
for field, label in optional_fields.items():
value = page_data.get(field)
if value is None or value == '':
optional_missing.append(label)
print(f" [WARN] {label}: EMPTY (may be legitimate)")
else:
print(f" [PASS] {label}: {value[:50]}...")
# Check database
print(f"\n[3] Checking database entry...")
conn = sqlite3.connect(scraper.cache.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT closing_time, current_bid, bid_count, description, status
FROM lots WHERE url = ?
""", (test_url,))
row = cursor.fetchone()
conn.close()
if row:
db_closing, db_bid, db_count, db_desc, db_status = row
print(f" DB closing_time: {db_closing or 'EMPTY'}")
print(f" DB current_bid: {db_bid or 'EMPTY'}")
print(f" DB bid_count: {db_count}")
print(f" DB description: {db_desc[:50] if db_desc else 'EMPTY'}...")
print(f" DB status: {db_status or 'EMPTY'}")
# Verify DB matches page_data
if db_closing != page_data.get('closing_time'):
print(f" [WARN] DB closing_time doesn't match page_data")
if db_count != page_data.get('bid_count'):
print(f" [WARN] DB bid_count doesn't match page_data")
else:
print(f" [WARN] No database entry found")
print(f"\n" + "="*60)
if missing_fields or empty_fields:
print(f"[FAIL] Missing fields: {', '.join(missing_fields)}")
print(f"[FAIL] Empty fields: {', '.join(empty_fields)}")
if optional_missing:
print(f"[WARN] Optional missing: {', '.join(optional_missing)}")
return False
else:
print("[PASS] All required fields are populated")
if optional_missing:
print(f"[WARN] Optional missing: {', '.join(optional_missing)}")
return True
async def test_lot_with_description():
"""Test that a lot with description preserves it"""
print("\n" + "="*60)
print("TEST: Lot with description")
print("="*60)
# Use a lot known to have description
test_url = "https://www.troostwijkauctions.com/l/used-dometic-seastar-tfxchx8641p-top-mount-engine-control-liver-A1-39684-12"
config.OFFLINE = False
scraper = TroostwijkScraper()
scraper.offline = False
print(f"\n[1] Scraping: {test_url}")
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
page_data = await scraper.crawl_page(page, test_url)
await browser.close()
if not page_data:
print(" [FAIL] No data returned")
return False
print(f"\n[2] Checking description...")
description = page_data.get('description', '')
if not description or description == '':
print(f" [FAIL] Description is empty")
return False
else:
print(f" [PASS] Description: {description[:100]}...")
return True
async def main():
"""Run all tests"""
print("\n" + "="*60)
print("MISSING FIELDS TEST SUITE")
print("="*60)
test1 = await test_lot_has_all_fields()
test2 = await test_lot_with_description()
print("\n" + "="*60)
if test1 and test2:
print("ALL TESTS PASSED")
else:
print("SOME TESTS FAILED")
if not test1:
print(" - test_lot_has_all_fields FAILED")
if not test2:
print(" - test_lot_with_description FAILED")
print("="*60 + "\n")
return 0 if (test1 and test2) else 1
if __name__ == '__main__':
exit_code = asyncio.run(main())
sys.exit(exit_code)

335
test/test_scraper.py Normal file
View File

@@ -0,0 +1,335 @@
#!/usr/bin/env python3
"""
Test suite for Troostwijk Scraper
Tests both auction and lot parsing with cached data
Requires Python 3.10+
"""
import sys
# Require Python 3.10+
if sys.version_info < (3, 10):
print("ERROR: This script requires Python 3.10 or higher")
print(f"Current version: {sys.version}")
sys.exit(1)
import asyncio
import json
import sqlite3
from datetime import datetime
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
from main import TroostwijkScraper, CacheManager, CACHE_DB
# Test URLs - these will use cached data to avoid overloading the server
TEST_AUCTIONS = [
"https://www.troostwijkauctions.com/a/online-auction-cnc-lathes-machining-centres-precision-measurement-romania-A7-39813",
"https://www.troostwijkauctions.com/a/faillissement-bab-shortlease-i-ii-b-v-%E2%80%93-2024-big-ass-energieopslagsystemen-A1-39557",
"https://www.troostwijkauctions.com/a/industriele-goederen-uit-diverse-bedrijfsbeeindigingen-A1-38675",
]
TEST_LOTS = [
"https://www.troostwijkauctions.com/l/%25282x%2529-duo-bureau-160x168-cm-A1-28505-5",
"https://www.troostwijkauctions.com/l/tos-sui-50-1000-universele-draaibank-A7-39568-9",
"https://www.troostwijkauctions.com/l/rolcontainer-%25282x%2529-A1-40191-101",
]
class TestResult:
def __init__(self, url, success, message, data=None):
self.url = url
self.success = success
self.message = message
self.data = data
class ScraperTester:
def __init__(self):
self.scraper = TroostwijkScraper()
self.results = []
def check_cache_exists(self, url):
"""Check if URL is cached"""
cached = self.scraper.cache.get(url, max_age_hours=999999) # Get even old cache
return cached is not None
def test_auction_parsing(self, url):
"""Test auction page parsing"""
print(f"\n{'='*70}")
print(f"Testing Auction: {url}")
print('='*70)
# Check cache
if not self.check_cache_exists(url):
return TestResult(
url,
False,
"❌ NOT IN CACHE - Please run scraper first to cache this URL",
None
)
# Get cached content
cached = self.scraper.cache.get(url, max_age_hours=999999)
content = cached['content']
print(f"✓ Cache hit (age: {(datetime.now().timestamp() - cached['timestamp']) / 3600:.1f} hours)")
# Parse
try:
data = self.scraper._parse_page(content, url)
if not data:
return TestResult(url, False, "❌ Parsing returned None", None)
if data.get('type') != 'auction':
return TestResult(
url,
False,
f"❌ Expected type='auction', got '{data.get('type')}'",
data
)
# Validate required fields
issues = []
required_fields = {
'auction_id': str,
'title': str,
'location': str,
'lots_count': int,
'first_lot_closing_time': str,
}
for field, expected_type in required_fields.items():
value = data.get(field)
if value is None or value == '':
issues.append(f"{field}: MISSING or EMPTY")
elif not isinstance(value, expected_type):
issues.append(f"{field}: Wrong type (expected {expected_type.__name__}, got {type(value).__name__})")
else:
# Pretty print value
display_value = str(value)[:60]
print(f"{field}: {display_value}")
if issues:
return TestResult(url, False, "\n".join(issues), data)
print(f" ✓ lots_count: {data.get('lots_count')}")
return TestResult(url, True, "✅ All auction fields validated successfully", data)
except Exception as e:
return TestResult(url, False, f"❌ Exception during parsing: {e}", None)
def test_lot_parsing(self, url):
"""Test lot page parsing"""
print(f"\n{'='*70}")
print(f"Testing Lot: {url}")
print('='*70)
# Check cache
if not self.check_cache_exists(url):
return TestResult(
url,
False,
"❌ NOT IN CACHE - Please run scraper first to cache this URL",
None
)
# Get cached content
cached = self.scraper.cache.get(url, max_age_hours=999999)
content = cached['content']
print(f"✓ Cache hit (age: {(datetime.now().timestamp() - cached['timestamp']) / 3600:.1f} hours)")
# Parse
try:
data = self.scraper._parse_page(content, url)
if not data:
return TestResult(url, False, "❌ Parsing returned None", None)
if data.get('type') != 'lot':
return TestResult(
url,
False,
f"❌ Expected type='lot', got '{data.get('type')}'",
data
)
# Validate required fields
issues = []
required_fields = {
'lot_id': (str, lambda x: x and len(x) > 0),
'title': (str, lambda x: x and len(x) > 3 and x not in ['...', 'N/A']),
'location': (str, lambda x: x and len(x) > 2 and x not in ['Locatie', 'Location']),
'current_bid': (str, lambda x: x and x not in ['€Huidig bod', 'Huidig bod']),
'closing_time': (str, lambda x: True), # Can be empty
'images': (list, lambda x: True), # Can be empty list
}
for field, (expected_type, validator) in required_fields.items():
value = data.get(field)
if value is None:
issues.append(f"{field}: MISSING (None)")
elif not isinstance(value, expected_type):
issues.append(f"{field}: Wrong type (expected {expected_type.__name__}, got {type(value).__name__})")
elif not validator(value):
issues.append(f"{field}: Invalid value: '{value}'")
else:
# Pretty print value
if field == 'images':
print(f"{field}: {len(value)} images")
for i, img in enumerate(value[:3], 1):
print(f" {i}. {img[:60]}...")
else:
display_value = str(value)[:60]
print(f"{field}: {display_value}")
# Additional checks
if data.get('bid_count') is not None:
print(f" ✓ bid_count: {data.get('bid_count')}")
if data.get('viewing_time'):
print(f" ✓ viewing_time: {data.get('viewing_time')}")
if data.get('pickup_date'):
print(f" ✓ pickup_date: {data.get('pickup_date')}")
if issues:
return TestResult(url, False, "\n".join(issues), data)
return TestResult(url, True, "✅ All lot fields validated successfully", data)
except Exception as e:
import traceback
return TestResult(url, False, f"❌ Exception during parsing: {e}\n{traceback.format_exc()}", None)
def run_all_tests(self):
"""Run all tests"""
print("\n" + "="*70)
print("TROOSTWIJK SCRAPER TEST SUITE")
print("="*70)
print("\nThis test suite uses CACHED data only - no live requests to server")
print("="*70)
# Test auctions
print("\n" + "="*70)
print("TESTING AUCTIONS")
print("="*70)
for url in TEST_AUCTIONS:
result = self.test_auction_parsing(url)
self.results.append(result)
# Test lots
print("\n" + "="*70)
print("TESTING LOTS")
print("="*70)
for url in TEST_LOTS:
result = self.test_lot_parsing(url)
self.results.append(result)
# Summary
self.print_summary()
def print_summary(self):
"""Print test summary"""
print("\n" + "="*70)
print("TEST SUMMARY")
print("="*70)
passed = sum(1 for r in self.results if r.success)
failed = sum(1 for r in self.results if not r.success)
total = len(self.results)
print(f"\nTotal tests: {total}")
print(f"Passed: {passed}")
print(f"Failed: {failed}")
print(f"Success rate: {passed/total*100:.1f}%")
if failed > 0:
print("\n" + "="*70)
print("FAILED TESTS:")
print("="*70)
for result in self.results:
if not result.success:
print(f"\n{result.url}")
print(result.message)
if result.data:
print("\nParsed data:")
for key, value in result.data.items():
if key != 'lots': # Don't print full lots array
print(f" {key}: {str(value)[:80]}")
print("\n" + "="*70)
return failed == 0
def check_cache_status():
"""Check cache compression status"""
print("\n" + "="*70)
print("CACHE STATUS CHECK")
print("="*70)
try:
with sqlite3.connect(CACHE_DB) as conn:
# Total entries
cursor = conn.execute("SELECT COUNT(*) FROM cache")
total = cursor.fetchone()[0]
# Compressed vs uncompressed
cursor = conn.execute("SELECT COUNT(*) FROM cache WHERE compressed = 1")
compressed = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(*) FROM cache WHERE compressed = 0 OR compressed IS NULL")
uncompressed = cursor.fetchone()[0]
print(f"Total cache entries: {total}")
print(f"Compressed: {compressed} ({compressed/total*100:.1f}%)")
print(f"Uncompressed: {uncompressed} ({uncompressed/total*100:.1f}%)")
if uncompressed > 0:
print(f"\n⚠️ Warning: {uncompressed} entries are still uncompressed")
print(" Run: python migrate_compress_cache.py")
else:
print("\n✓ All cache entries are compressed!")
# Check test URLs
print(f"\n{'='*70}")
print("TEST URL CACHE STATUS:")
print('='*70)
all_test_urls = TEST_AUCTIONS + TEST_LOTS
cached_count = 0
for url in all_test_urls:
cursor = conn.execute("SELECT url FROM cache WHERE url = ?", (url,))
if cursor.fetchone():
print(f"{url[:60]}...")
cached_count += 1
else:
print(f"{url[:60]}... (NOT CACHED)")
print(f"\n{cached_count}/{len(all_test_urls)} test URLs are cached")
if cached_count < len(all_test_urls):
print("\n⚠️ Some test URLs are not cached. Tests for those URLs will fail.")
print(" Run the main scraper to cache these URLs first.")
except Exception as e:
print(f"Error checking cache status: {e}")
if __name__ == "__main__":
# Check cache status first
check_cache_status()
# Run tests
tester = ScraperTester()
success = tester.run_all_tests()
# Exit with appropriate code
sys.exit(0 if success else 1)