304 lines
9.7 KiB
Python
304 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test cache behavior - verify page is only fetched once and data persists offline
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import asyncio
|
|
import sqlite3
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# Add src to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
|
|
|
|
from cache import CacheManager
|
|
from scraper import TroostwijkScraper
|
|
import config
|
|
|
|
|
|
class TestCacheBehavior:
|
|
"""Test suite for cache and offline functionality"""
|
|
|
|
def __init__(self):
|
|
self.test_db = "test_cache.db"
|
|
self.original_db = config.CACHE_DB
|
|
self.cache = None
|
|
self.scraper = None
|
|
|
|
def setup(self):
|
|
"""Setup test environment"""
|
|
print("\n" + "="*60)
|
|
print("TEST SETUP")
|
|
print("="*60)
|
|
|
|
# Use test database
|
|
config.CACHE_DB = self.test_db
|
|
|
|
# Ensure offline mode is disabled for tests
|
|
config.OFFLINE = False
|
|
|
|
# Clean up old test database
|
|
if os.path.exists(self.test_db):
|
|
os.remove(self.test_db)
|
|
print(f" * Removed old test database")
|
|
|
|
# Initialize cache and scraper
|
|
self.cache = CacheManager()
|
|
self.scraper = TroostwijkScraper()
|
|
self.scraper.offline = False # Explicitly disable offline mode
|
|
|
|
print(f" * Created test database: {self.test_db}")
|
|
print(f" * Initialized cache and scraper")
|
|
print(f" * Offline mode: DISABLED")
|
|
|
|
def teardown(self):
|
|
"""Cleanup test environment"""
|
|
print("\n" + "="*60)
|
|
print("TEST TEARDOWN")
|
|
print("="*60)
|
|
|
|
# Restore original database path
|
|
config.CACHE_DB = self.original_db
|
|
|
|
# Keep test database for inspection
|
|
print(f" * Test database preserved: {self.test_db}")
|
|
print(f" * Restored original database path")
|
|
|
|
async def test_page_fetched_once(self):
|
|
"""Test that a page is only fetched from network once"""
|
|
print("\n" + "="*60)
|
|
print("TEST 1: Page Fetched Only Once")
|
|
print("="*60)
|
|
|
|
# Pick a real lot URL to test with
|
|
test_url = "https://www.troostwijkauctions.com/l/bmw-x5-xdrive40d-high-executive-m-sport-a8-286pk-2019-A1-26955-7"
|
|
|
|
print(f"\nTest URL: {test_url}")
|
|
|
|
# First visit - should fetch from network
|
|
print("\n--- FIRST VISIT (should fetch from network) ---")
|
|
start_time = time.time()
|
|
|
|
async with asyncio.timeout(60): # 60 second timeout
|
|
page_data_1 = await self._scrape_single_page(test_url)
|
|
|
|
first_visit_time = time.time() - start_time
|
|
|
|
if not page_data_1:
|
|
print(" [FAIL] First visit returned no data")
|
|
return False
|
|
|
|
print(f" [OK] First visit completed in {first_visit_time:.2f}s")
|
|
print(f" [OK] Got lot data: {page_data_1.get('title', 'N/A')[:60]}...")
|
|
|
|
# Check closing time was captured
|
|
closing_time_1 = page_data_1.get('closing_time')
|
|
print(f" [OK] Closing time: {closing_time_1}")
|
|
|
|
# Second visit - should use cache
|
|
print("\n--- SECOND VISIT (should use cache) ---")
|
|
start_time = time.time()
|
|
|
|
async with asyncio.timeout(30): # Should be much faster
|
|
page_data_2 = await self._scrape_single_page(test_url)
|
|
|
|
second_visit_time = time.time() - start_time
|
|
|
|
if not page_data_2:
|
|
print(" [FAIL] Second visit returned no data")
|
|
return False
|
|
|
|
print(f" [OK] Second visit completed in {second_visit_time:.2f}s")
|
|
|
|
# Verify data matches
|
|
if page_data_1.get('lot_id') != page_data_2.get('lot_id'):
|
|
print(f" [FAIL] Lot IDs don't match")
|
|
return False
|
|
|
|
closing_time_2 = page_data_2.get('closing_time')
|
|
print(f" [OK] Closing time: {closing_time_2}")
|
|
|
|
if closing_time_1 != closing_time_2:
|
|
print(f" [FAIL] Closing times don't match!")
|
|
print(f" First: {closing_time_1}")
|
|
print(f" Second: {closing_time_2}")
|
|
return False
|
|
|
|
# Verify second visit was significantly faster (used cache)
|
|
if second_visit_time >= first_visit_time * 0.5:
|
|
print(f" [WARN] Second visit not significantly faster")
|
|
print(f" First: {first_visit_time:.2f}s")
|
|
print(f" Second: {second_visit_time:.2f}s")
|
|
else:
|
|
print(f" [OK] Second visit was {(first_visit_time / second_visit_time):.1f}x faster (cache working!)")
|
|
|
|
# Verify resource cache has entries
|
|
conn = sqlite3.connect(self.test_db)
|
|
cursor = conn.execute("SELECT COUNT(*) FROM resource_cache")
|
|
resource_count = cursor.fetchone()[0]
|
|
conn.close()
|
|
|
|
print(f" [OK] Cached {resource_count} resources")
|
|
|
|
print("\n[PASS] TEST 1 PASSED: Page fetched only once, data persists")
|
|
return True
|
|
|
|
async def test_offline_mode(self):
|
|
"""Test that offline mode works with cached data"""
|
|
print("\n" + "="*60)
|
|
print("TEST 2: Offline Mode with Cached Data")
|
|
print("="*60)
|
|
|
|
# Use the same URL from test 1 (should be cached)
|
|
test_url = "https://www.troostwijkauctions.com/l/bmw-x5-xdrive40d-high-executive-m-sport-a8-286pk-2019-A1-26955-7"
|
|
|
|
# Enable offline mode
|
|
original_offline = config.OFFLINE
|
|
config.OFFLINE = True
|
|
self.scraper.offline = True
|
|
|
|
print(f"\nTest URL: {test_url}")
|
|
print(" * Offline mode: ENABLED")
|
|
|
|
try:
|
|
# Try to scrape in offline mode
|
|
print("\n--- OFFLINE SCRAPE (should use DB/cache only) ---")
|
|
start_time = time.time()
|
|
|
|
async with asyncio.timeout(30):
|
|
page_data = await self._scrape_single_page(test_url)
|
|
|
|
offline_time = time.time() - start_time
|
|
|
|
if not page_data:
|
|
print(" [FAIL] Offline mode returned no data")
|
|
return False
|
|
|
|
print(f" [OK] Offline scrape completed in {offline_time:.2f}s")
|
|
print(f" [OK] Got lot data: {page_data.get('title', 'N/A')[:60]}...")
|
|
|
|
# Check closing time is available
|
|
closing_time = page_data.get('closing_time')
|
|
if not closing_time:
|
|
print(f" [FAIL] No closing time in offline mode")
|
|
return False
|
|
|
|
print(f" [OK] Closing time preserved: {closing_time}")
|
|
|
|
# Verify essential fields are present
|
|
essential_fields = ['lot_id', 'title', 'url', 'location']
|
|
missing_fields = [f for f in essential_fields if not page_data.get(f)]
|
|
|
|
if missing_fields:
|
|
print(f" [FAIL] Missing essential fields: {missing_fields}")
|
|
return False
|
|
|
|
print(f" [OK] All essential fields present")
|
|
|
|
# Check database has the lot
|
|
conn = sqlite3.connect(self.test_db)
|
|
cursor = conn.execute("SELECT closing_time FROM lots WHERE url = ?", (test_url,))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if not row:
|
|
print(f" [FAIL] Lot not found in database")
|
|
return False
|
|
|
|
db_closing_time = row[0]
|
|
print(f" [OK] Database has closing time: {db_closing_time}")
|
|
|
|
if db_closing_time != closing_time:
|
|
print(f" [FAIL] Closing time mismatch")
|
|
print(f" Scraped: {closing_time}")
|
|
print(f" Database: {db_closing_time}")
|
|
return False
|
|
|
|
print("\n[PASS] TEST 2 PASSED: Offline mode works, closing time preserved")
|
|
return True
|
|
|
|
finally:
|
|
# Restore offline mode
|
|
config.OFFLINE = original_offline
|
|
self.scraper.offline = original_offline
|
|
|
|
async def _scrape_single_page(self, url):
|
|
"""Helper to scrape a single page"""
|
|
from playwright.async_api import async_playwright
|
|
|
|
if config.OFFLINE or self.scraper.offline:
|
|
# Offline mode - use crawl_page directly
|
|
return await self.scraper.crawl_page(page=None, url=url)
|
|
|
|
# Online mode - need browser
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(headless=True)
|
|
page = await browser.new_page()
|
|
|
|
try:
|
|
result = await self.scraper.crawl_page(page, url)
|
|
return result
|
|
finally:
|
|
await browser.close()
|
|
|
|
async def run_all_tests(self):
|
|
"""Run all tests"""
|
|
print("\n" + "="*70)
|
|
print("CACHE BEHAVIOR TEST SUITE")
|
|
print("="*70)
|
|
|
|
self.setup()
|
|
|
|
results = []
|
|
|
|
try:
|
|
# Test 1: Page fetched once
|
|
result1 = await self.test_page_fetched_once()
|
|
results.append(("Page Fetched Once", result1))
|
|
|
|
# Test 2: Offline mode
|
|
result2 = await self.test_offline_mode()
|
|
results.append(("Offline Mode", result2))
|
|
|
|
except Exception as e:
|
|
print(f"\n[ERROR] TEST SUITE ERROR: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
finally:
|
|
self.teardown()
|
|
|
|
# Print summary
|
|
print("\n" + "="*70)
|
|
print("TEST SUMMARY")
|
|
print("="*70)
|
|
|
|
all_passed = True
|
|
for test_name, passed in results:
|
|
status = "[PASS]" if passed else "[FAIL]"
|
|
print(f" {status}: {test_name}")
|
|
if not passed:
|
|
all_passed = False
|
|
|
|
print("="*70)
|
|
|
|
if all_passed:
|
|
print("\n*** ALL TESTS PASSED! ***")
|
|
return 0
|
|
else:
|
|
print("\n*** SOME TESTS FAILED ***")
|
|
return 1
|
|
|
|
|
|
async def main():
|
|
"""Run tests"""
|
|
tester = TestCacheBehavior()
|
|
exit_code = await tester.run_all_tests()
|
|
sys.exit(exit_code)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|