744 lines
28 KiB
Python
744 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Troostwijk Auctions Scraper
|
|
Focuses on extracting auction lots with caching and rate limiting
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import csv
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
from datetime import datetime
|
|
from urllib.parse import urljoin, urlparse
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional, Set
|
|
import random
|
|
|
|
# Import Playwright - REQUIRED for bypassing Cloudflare
|
|
from playwright.async_api import async_playwright, Browser, Page
|
|
|
|
# ==================== CONFIGURATION ====================
|
|
BASE_URL = "https://www.troostwijkauctions.com"
|
|
CACHE_DB = "/mnt/okcomputer/output/cache.db"
|
|
OUTPUT_DIR = "/mnt/okcomputer/output"
|
|
RATE_LIMIT_SECONDS = 0.5 # EXACTLY 0.5 seconds between requests - YOUR REQUIREMENT
|
|
MAX_PAGES = 50 # Number of listing pages to crawl (adjust as needed)
|
|
|
|
# Setup directories
|
|
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
class CacheManager:
|
|
"""Manages page caching using SQLite - EVERY PAGE IS CACHED"""
|
|
|
|
def __init__(self, db_path: str):
|
|
self.db_path = db_path
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""Initialize cache database"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS cache (
|
|
url TEXT PRIMARY KEY,
|
|
content TEXT,
|
|
timestamp REAL,
|
|
status_code INTEGER
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_timestamp ON cache(timestamp)
|
|
""")
|
|
conn.commit()
|
|
|
|
def get(self, url: str, max_age_hours: int = 24) -> Optional[Dict]:
|
|
"""Get cached page if it exists and is not too old"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.execute(
|
|
"SELECT content, timestamp, status_code FROM cache WHERE url = ?",
|
|
(url,)
|
|
)
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
content, timestamp, status_code = row
|
|
age_hours = (time.time() - timestamp) / 3600
|
|
|
|
if age_hours <= max_age_hours:
|
|
return {
|
|
'content': content,
|
|
'timestamp': timestamp,
|
|
'status_code': status_code,
|
|
'cached': True
|
|
}
|
|
return None
|
|
|
|
def set(self, url: str, content: str, status_code: int = 200):
|
|
"""Cache a page - EVERY SUCCESSFUL REQUEST IS CACHED"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO cache (url, content, timestamp, status_code) VALUES (?, ?, ?, ?)",
|
|
(url, content, time.time(), status_code)
|
|
)
|
|
conn.commit()
|
|
print(f" → Cached: {url}")
|
|
|
|
def clear_old(self, max_age_hours: int = 168): # Default: 1 week
|
|
"""Clear old cache entries to prevent database bloat"""
|
|
cutoff_time = time.time() - (max_age_hours * 3600)
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
deleted = conn.execute("DELETE FROM cache WHERE timestamp < ?", (cutoff_time,)).rowcount
|
|
conn.commit()
|
|
if deleted > 0:
|
|
print(f" → Cleared {deleted} old cache entries")
|
|
|
|
|
|
class TroostwijkScraper:
|
|
"""Main scraper class for Troostwijk Auctions"""
|
|
|
|
def __init__(self):
|
|
self.base_url = BASE_URL
|
|
self.cache = CacheManager(CACHE_DB)
|
|
self.visited_lots: Set[str] = set()
|
|
self.output_data: List[Dict] = []
|
|
self.last_request_time = 0
|
|
|
|
async def _rate_limit(self):
|
|
"""ENSURE EXACTLY 0.5s BETWEEN REQUESTS - YOUR REQUIREMENT"""
|
|
current_time = time.time()
|
|
time_since_last = current_time - self.last_request_time
|
|
|
|
if time_since_last < RATE_LIMIT_SECONDS:
|
|
delay = RATE_LIMIT_SECONDS - time_since_last
|
|
await asyncio.sleep(delay)
|
|
|
|
self.last_request_time = time.time()
|
|
|
|
async def _get_page(self, page: Page, url: str, use_cache: bool = True) -> Optional[str]:
|
|
"""Get page content with caching and strict rate limiting"""
|
|
# Check cache first - AVOID UNNECESSARY REQUESTS
|
|
if use_cache:
|
|
cached = self.cache.get(url)
|
|
if cached:
|
|
print(f" CACHE HIT: {url}")
|
|
return cached['content']
|
|
|
|
# Rate limit before making request - YOUR 0.5s REQUIREMENT
|
|
await self._rate_limit()
|
|
|
|
try:
|
|
print(f" FETCHING: {url}")
|
|
await page.goto(url, wait_until='networkidle', timeout=30000)
|
|
|
|
# Small additional wait for dynamic content
|
|
await asyncio.sleep(random.uniform(0.3, 0.7))
|
|
|
|
content = await page.content()
|
|
|
|
# Cache the successful result
|
|
self.cache.set(url, content, 200)
|
|
|
|
return content
|
|
|
|
except Exception as e:
|
|
print(f" ERROR: {e}")
|
|
# Cache the error to avoid retrying too soon
|
|
self.cache.set(url, "", 500)
|
|
return None
|
|
|
|
def _extract_lot_urls_from_listing(self, content: str) -> List[str]:
|
|
"""Extract lot URLs from auction listing page"""
|
|
# Pattern matches /lots/ followed by digits
|
|
pattern = r'href=["\']([/]a/[^"\']+)["\']'
|
|
matches = re.findall(pattern, content, re.IGNORECASE)
|
|
|
|
lot_urls = []
|
|
for match in matches:
|
|
full_url = urljoin(self.base_url, match)
|
|
lot_urls.append(full_url)
|
|
|
|
# Remove duplicates
|
|
return list(set(lot_urls))
|
|
|
|
def _extract_lot_id(self, url: str) -> str:
|
|
"""Extract lot ID from URL"""
|
|
path = urlparse(url).path
|
|
# Try /lots/ pattern first (legacy)
|
|
match = re.search(r'/lots/(\d+)', path)
|
|
if match:
|
|
return match.group(1)
|
|
# Try /a/ pattern (current format: /a/title-A7-12345)
|
|
match = re.search(r'/a/.*?([A-Z]\d+-\d+)', path)
|
|
if match:
|
|
return match.group(1)
|
|
# Fallback: return last part of path
|
|
return path.split('/')[-1] if path else ""
|
|
|
|
def _parse_lot_page(self, content: str, url: str) -> Dict:
|
|
"""Parse individual lot page and extract data"""
|
|
# First try to extract from __NEXT_DATA__ JSON (Next.js sites)
|
|
next_data = self._extract_nextjs_data(content)
|
|
if next_data:
|
|
return next_data
|
|
|
|
# Fallback to HTML parsing
|
|
content = re.sub(r'\s+', ' ', content)
|
|
|
|
data = {
|
|
'url': url,
|
|
'lot_id': self._extract_lot_id(url),
|
|
'title': self._extract_meta_content(content, 'og:title'),
|
|
'current_bid': self._extract_current_bid(content),
|
|
'bid_count': self._extract_bid_count(content),
|
|
'end_date': self._extract_end_date(content),
|
|
'location': self._extract_location(content),
|
|
'description': self._extract_description(content),
|
|
'category': self._extract_category(content),
|
|
'images': self._extract_images(content),
|
|
'scraped_at': datetime.now().isoformat()
|
|
}
|
|
|
|
return data
|
|
|
|
def _extract_nextjs_data(self, content: str) -> Optional[Dict]:
|
|
"""Extract data from Next.js __NEXT_DATA__ JSON"""
|
|
try:
|
|
# Find the __NEXT_DATA__ script tag
|
|
match = re.search(r'<script[^>]*id="__NEXT_DATA__"[^>]*>(.+?)</script>', content, re.DOTALL)
|
|
if not match:
|
|
return None
|
|
|
|
data = json.loads(match.group(1))
|
|
|
|
# Navigate to pageProps
|
|
page_props = data.get('props', {}).get('pageProps', {})
|
|
|
|
# Check if this is an auction page (contains lot data)
|
|
if 'auction' in page_props:
|
|
# This is a single lot/auction page
|
|
auction = page_props.get('auction', {})
|
|
|
|
# Extract main data
|
|
result = {
|
|
'url': self.base_url + '/a/' + auction.get('urlSlug', ''),
|
|
'lot_id': auction.get('displayId', ''),
|
|
'title': auction.get('name', ''),
|
|
'current_bid': '', # Need to check if this has bid info
|
|
'bid_count': 0,
|
|
'end_date': self._format_timestamp(auction.get('minEndDate', '')),
|
|
'location': self._extract_location_from_json(auction),
|
|
'description': auction.get('description', ''),
|
|
'category': auction.get('category', {}).get('name', '') if isinstance(auction.get('category'), dict) else '',
|
|
'images': [auction['image']['url']] if auction.get('image') and auction['image'].get('url') else [],
|
|
'scraped_at': datetime.now().isoformat()
|
|
}
|
|
|
|
return result
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f" → Error parsing __NEXT_DATA__: {e}")
|
|
return None
|
|
|
|
def _format_timestamp(self, timestamp: any) -> str:
|
|
"""Convert Unix timestamp to readable date"""
|
|
try:
|
|
if isinstance(timestamp, (int, float)) and timestamp > 0:
|
|
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
|
|
return str(timestamp) if timestamp else ''
|
|
except:
|
|
return str(timestamp) if timestamp else ''
|
|
|
|
def _extract_location_from_json(self, auction_data: Dict) -> str:
|
|
"""Extract location from auction JSON data"""
|
|
# Try viewingDays first
|
|
viewing_days = auction_data.get('viewingDays', [])
|
|
if viewing_days and len(viewing_days) > 0:
|
|
first_location = viewing_days[0]
|
|
city = first_location.get('city', '')
|
|
country = first_location.get('countryCode', '').upper()
|
|
if city:
|
|
return f"{city}, {country}" if country else city
|
|
|
|
# Try collectionDays
|
|
collection_days = auction_data.get('collectionDays', [])
|
|
if collection_days and len(collection_days) > 0:
|
|
first_location = collection_days[0]
|
|
city = first_location.get('city', '')
|
|
country = first_location.get('countryCode', '').upper()
|
|
if city:
|
|
return f"{city}, {country}" if country else city
|
|
|
|
return ''
|
|
|
|
def _extract_meta_content(self, content: str, property_name: str) -> str:
|
|
"""Extract content from meta tags"""
|
|
pattern = rf'<meta[^>]*property=["\']{property_name}["\'][^>]*content=["\']([^"\']+)["\']'
|
|
match = re.search(pattern, content, re.IGNORECASE)
|
|
if match:
|
|
return self._clean_text(match.group(1))
|
|
return ""
|
|
|
|
def _extract_current_bid(self, content: str) -> str:
|
|
"""Extract current bid amount"""
|
|
patterns = [
|
|
# JSON data patterns (most reliable)
|
|
r'"currentBid"\s*:\s*"([^"]+)"',
|
|
r'"currentBid"\s*:\s*(\d+(?:\.\d+)?)',
|
|
r'currentBid["\']?\s*:\s*["\']?([€\d,.\s]+)["\']?',
|
|
# HTML patterns - look for bid amount AFTER the label
|
|
r'(?:Current bid|Huidig bod)[:\s]*</?\w*>\s*(€[\d,.\s]+)',
|
|
r'(?:Current bid|Huidig bod)[:\s]+(€[\d,.\s]+)',
|
|
r'<[^>]*bid-amount[^>]*>[\s]*(€[\d,.\s]+)',
|
|
# Meta tags
|
|
r'<meta[^>]*property=["\']auction:currentBid["\'][^>]*content=["\']([^"\']+)["\']',
|
|
# Structured data
|
|
r'"price"\s*:\s*"([€\d,.\s]+)"',
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, content, re.IGNORECASE | re.DOTALL)
|
|
if match:
|
|
bid = match.group(1).strip()
|
|
# Validate it's not just the label
|
|
if bid and bid.lower() not in ['huidig bod', 'current bid', 'locatie', 'location']:
|
|
# Clean up the bid value
|
|
if not bid.startswith('€'):
|
|
bid = f"€{bid}"
|
|
return bid
|
|
|
|
return "€0"
|
|
|
|
def _extract_bid_count(self, content: str) -> int:
|
|
"""Extract number of bids"""
|
|
patterns = [
|
|
r'(\d+)\s*bids?',
|
|
r'bidCount["\']:\s*["\']?(\d+)["\']?'
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, content, re.IGNORECASE)
|
|
if match:
|
|
try:
|
|
return int(match.group(1))
|
|
except:
|
|
return 0
|
|
|
|
return 0
|
|
|
|
def _extract_end_date(self, content: str) -> str:
|
|
"""Extract auction end date"""
|
|
patterns = [
|
|
r'Ends?[:\s]+([A-Za-z0-9,:\s]+)',
|
|
r'endTime["\']:\s*["\']([^"\']+)["\']',
|
|
r'class="[^"]*end[^"]*".*?>([A-Za-z0-9,:\s]+)<'
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, content, re.IGNORECASE)
|
|
if match:
|
|
return match.group(1).strip()
|
|
|
|
return ""
|
|
|
|
def _extract_location(self, content: str) -> str:
|
|
"""Extract location"""
|
|
patterns = [
|
|
# JSON data patterns (most reliable)
|
|
r'"location"\s*:\s*"([^"]+)"',
|
|
r'"address"\s*:\s*"([^"]+)"',
|
|
r'"addressLocality"\s*:\s*"([^"]+)"',
|
|
# HTML patterns - look for location AFTER the label
|
|
r'(?:Location|Locatie)[:\s]*</?\w*>\s*([A-Za-zÀ-ÿ0-9\s,.-]+?)(?:<|$)',
|
|
r'(?:Location|Locatie)[:\s]+([A-Za-zÀ-ÿ0-9\s,.-]+?)(?:<br|</|$)',
|
|
r'<[^>]*location[^>]*>[\s]*([A-Za-zÀ-ÿ0-9\s,.-]+?)</[^>]*>',
|
|
# Icon or label based
|
|
r'<i[^>]*location[^>]*></i>\s*([A-Za-zÀ-ÿ0-9\s,.-]+)',
|
|
# Meta tags
|
|
r'<meta[^>]*property=["\']auction:location["\'][^>]*content=["\']([^"\']+)["\']',
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, content, re.IGNORECASE | re.DOTALL)
|
|
if match:
|
|
location = self._clean_text(match.group(1))
|
|
# Validate it's not just the label
|
|
if location and location.lower() not in ['locatie', 'location', 'huidig bod', 'current bid']:
|
|
# Remove trailing punctuation and whitespace
|
|
location = re.sub(r'[,.\s]+$', '', location)
|
|
if len(location) > 2: # Must be more than 2 chars
|
|
return location
|
|
|
|
return ""
|
|
|
|
def _extract_description(self, content: str) -> str:
|
|
"""Extract description"""
|
|
patterns = [
|
|
r'<meta[^>]*name=["\']description["\'][^>]*content=["\']([^"\']+)["\']',
|
|
r'class="[^"]*description[^"]*".*?>([^<]+)<'
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, content, re.IGNORECASE | re.DOTALL)
|
|
if match:
|
|
return self._clean_text(match.group(1))[:500]
|
|
|
|
return ""
|
|
|
|
def _extract_category(self, content: str) -> str:
|
|
"""Extract category from breadcrumb or meta tags"""
|
|
# Try breadcrumb first
|
|
pattern = r'class="breadcrumb[^"]*".*?>([A-Za-z\s]+)</a>'
|
|
match = re.search(pattern, content, re.IGNORECASE)
|
|
if match:
|
|
return self._clean_text(match.group(1))
|
|
|
|
# Try meta
|
|
return self._extract_meta_content(content, 'category')
|
|
|
|
def _extract_images(self, content: str) -> List[str]:
|
|
"""Extract image URLs"""
|
|
pattern = r'<img[^>]*src=["\']([^"\']+\.jpe?g|[^"\']+\.png)["\'][^>]*>'
|
|
matches = re.findall(pattern, content, re.IGNORECASE)
|
|
|
|
images = []
|
|
for match in matches:
|
|
if any(skip in match.lower() for skip in ['logo', 'icon', 'placeholder', 'banner']):
|
|
continue
|
|
full_url = urljoin(self.base_url, match)
|
|
images.append(full_url)
|
|
|
|
return images[:5] # Limit to 5 images
|
|
|
|
def _clean_text(self, text: str) -> str:
|
|
"""Clean extracted text"""
|
|
import html
|
|
text = html.unescape(text)
|
|
text = re.sub(r'\s+', ' ', text)
|
|
return text.strip()
|
|
|
|
async def crawl_listing_page(self, page: Page, page_num: int) -> List[str]:
|
|
"""Crawl a single listing page and return lot URLs"""
|
|
url = f"{self.base_url}/auctions?page={page_num}"
|
|
print(f"\n{'='*60}")
|
|
print(f"LISTING PAGE {page_num}: {url}")
|
|
print(f"{'='*60}")
|
|
|
|
content = await self._get_page(page, url)
|
|
if not content:
|
|
return []
|
|
|
|
lot_urls = self._extract_lot_urls_from_listing(content)
|
|
print(f"→ Found {len(lot_urls)} lot URLs")
|
|
|
|
return lot_urls
|
|
|
|
async def crawl_lot(self, page: Page, url: str) -> Optional[Dict]:
|
|
"""Crawl an individual lot page"""
|
|
if url in self.visited_lots:
|
|
print(f" → Skipping (already visited): {url}")
|
|
return None
|
|
|
|
lot_id = self._extract_lot_id(url)
|
|
print(f"\n[LOT {lot_id}]")
|
|
|
|
content = await self._get_page(page, url)
|
|
if not content:
|
|
return None
|
|
|
|
lot_data = self._parse_lot_page(content, url)
|
|
self.visited_lots.add(url)
|
|
|
|
print(f" → Title: {lot_data.get('title', 'N/A')[:60]}...")
|
|
print(f" → Bid: {lot_data.get('current_bid', 'N/A')}")
|
|
print(f" → Location: {lot_data.get('location', 'N/A')}")
|
|
|
|
return lot_data
|
|
|
|
async def crawl_auctions(self, max_pages: int = MAX_PAGES) -> List[Dict]:
|
|
"""Main crawl function"""
|
|
async with async_playwright() as p:
|
|
print("Launching browser...")
|
|
browser = await p.chromium.launch(
|
|
headless=True,
|
|
args=[
|
|
'--no-sandbox',
|
|
'--disable-setuid-sandbox',
|
|
'--disable-blink-features=AutomationControlled'
|
|
]
|
|
)
|
|
|
|
page = await browser.new_page(
|
|
viewport={'width': 1920, 'height': 1080},
|
|
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'
|
|
)
|
|
|
|
# Set extra headers
|
|
await page.set_extra_http_headers({
|
|
'Accept-Language': 'en-US,en;q=0.9',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
|
|
})
|
|
|
|
all_lot_urls = []
|
|
|
|
# First pass: collect all lot URLs from listing pages
|
|
print("\n" + "="*60)
|
|
print("PHASE 1: COLLECTING LOT URLs FROM LISTING PAGES")
|
|
print("="*60)
|
|
|
|
for page_num in range(1, max_pages + 1):
|
|
lot_urls = await self.crawl_listing_page(page, page_num)
|
|
if not lot_urls:
|
|
print(f"No lots found on page {page_num}, stopping")
|
|
break
|
|
all_lot_urls.extend(lot_urls)
|
|
print(f" → Total lots collected so far: {len(all_lot_urls)}")
|
|
|
|
# Remove duplicates
|
|
all_lot_urls = list(set(all_lot_urls))
|
|
print(f"\n{'='*60}")
|
|
print(f"PHASE 1 COMPLETE: {len(all_lot_urls)} UNIQUE LOTS TO SCRAPE")
|
|
print(f"{'='*60}")
|
|
|
|
# Second pass: scrape each lot page
|
|
print("\n" + "="*60)
|
|
print("PHASE 2: SCRAPING INDIVIDUAL LOT PAGES")
|
|
print("="*60)
|
|
|
|
results = []
|
|
for i, lot_url in enumerate(all_lot_urls):
|
|
print(f"\n[{i+1:>3}/{len(all_lot_urls)}] ", end="")
|
|
lot_data = await self.crawl_lot(page, lot_url)
|
|
if lot_data:
|
|
results.append(lot_data)
|
|
# Save progress after each successful scrape
|
|
if (i + 1) % 10 == 0: # Save every 10 lots
|
|
self._save_intermediate(results)
|
|
|
|
await browser.close()
|
|
return results
|
|
|
|
def _save_intermediate(self, data: List[Dict]):
|
|
"""Save intermediate results"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"{OUTPUT_DIR}/troostwijk_lots_partial_{timestamp}.json"
|
|
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
'count': len(data),
|
|
'lots': data
|
|
}, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\n → PROGRESS SAVED: {filename}")
|
|
|
|
def save_final_results(self, data: List[Dict]):
|
|
"""Save final results in multiple formats"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
# Save JSON
|
|
json_file = f"{OUTPUT_DIR}/troostwijk_lots_final_{timestamp}.json"
|
|
with open(json_file, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
'count': len(data),
|
|
'scraped_at': datetime.now().isoformat(),
|
|
'rate_limit_seconds': RATE_LIMIT_SECONDS,
|
|
'lots': data
|
|
}, f, indent=2, ensure_ascii=False)
|
|
|
|
# Save CSV
|
|
csv_file = f"{OUTPUT_DIR}/troostwijk_lots_final_{timestamp}.csv"
|
|
if data:
|
|
flat_data = []
|
|
for item in data:
|
|
flat_item = item.copy()
|
|
flat_item['images'] = ', '.join(flat_item.get('images', []))
|
|
flat_data.append(flat_item)
|
|
|
|
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
|
|
fieldnames = ['url', 'lot_id', 'title', 'current_bid', 'bid_count',
|
|
'end_date', 'location', 'description', 'category', 'images', 'scraped_at']
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
|
|
writer.writeheader()
|
|
writer.writerows(flat_data)
|
|
|
|
return json_file, csv_file
|
|
|
|
|
|
def test_extraction(test_url: str = "https://www.troostwijkauctions.com/a/machines-en-toebehoren-%28hout-en-kunststofverwerking-handlingapparatuur-bouwmachines-landbouwindustrie%29-oost-europa-december-A7-35847"):
|
|
"""Test extraction on a specific cached URL to debug patterns"""
|
|
scraper = TroostwijkScraper()
|
|
|
|
# Try to get from cache
|
|
cached = scraper.cache.get(test_url)
|
|
if not cached:
|
|
print(f"ERROR: URL not found in cache: {test_url}")
|
|
print(f"\nAvailable cached URLs:")
|
|
with sqlite3.connect(CACHE_DB) as conn:
|
|
cursor = conn.execute("SELECT url FROM cache ORDER BY timestamp DESC LIMIT 10")
|
|
for row in cursor.fetchall():
|
|
print(f" - {row[0]}")
|
|
return
|
|
|
|
content = cached['content']
|
|
print(f"\n{'='*60}")
|
|
print(f"TESTING EXTRACTION FROM: {test_url}")
|
|
print(f"{'='*60}")
|
|
print(f"Content length: {len(content)} chars")
|
|
print(f"Cache age: {(time.time() - cached['timestamp']) / 3600:.1f} hours")
|
|
|
|
# Test each extraction method
|
|
lot_data = scraper._parse_lot_page(content, test_url)
|
|
|
|
print(f"\n{'='*60}")
|
|
print("EXTRACTED DATA:")
|
|
print(f"{'='*60}")
|
|
for key, value in lot_data.items():
|
|
if key == 'images':
|
|
print(f"{key:.<20}: {len(value)} images")
|
|
for img in value[:3]:
|
|
print(f"{'':.<20} - {img}")
|
|
else:
|
|
display_value = str(value)[:100] if value else "(empty)"
|
|
# Handle Unicode characters that Windows console can't display
|
|
try:
|
|
print(f"{key:.<20}: {display_value}")
|
|
except UnicodeEncodeError:
|
|
safe_value = display_value.encode('ascii', 'replace').decode('ascii')
|
|
print(f"{key:.<20}: {safe_value}")
|
|
|
|
# Validation checks
|
|
print(f"\n{'='*60}")
|
|
print("VALIDATION CHECKS:")
|
|
print(f"{'='*60}")
|
|
|
|
issues = []
|
|
if lot_data['current_bid'] in ['Huidig bod', 'Current bid', '€0', '']:
|
|
issues.append("[!] Current bid not extracted correctly")
|
|
else:
|
|
print("[OK] Current bid looks valid:", lot_data['current_bid'])
|
|
|
|
if lot_data['location'] in ['Locatie', 'Location', '']:
|
|
issues.append("[!] Location not extracted correctly")
|
|
else:
|
|
print("[OK] Location looks valid:", lot_data['location'])
|
|
|
|
if lot_data['title'] in ['', '...']:
|
|
issues.append("[!] Title not extracted correctly")
|
|
else:
|
|
print("[OK] Title looks valid:", lot_data['title'][:50])
|
|
|
|
if issues:
|
|
print(f"\n[ISSUES FOUND]")
|
|
for issue in issues:
|
|
print(f" {issue}")
|
|
else:
|
|
print(f"\n[ALL FIELDS EXTRACTED SUCCESSFULLY!]")
|
|
|
|
# Debug: Show raw HTML snippets for problematic fields
|
|
print(f"\n{'='*60}")
|
|
print("DEBUG: RAW HTML SNIPPETS")
|
|
print(f"{'='*60}")
|
|
|
|
# Look for bid-related content
|
|
print(f"\n1. Bid patterns in content:")
|
|
bid_matches = re.findall(r'.{0,50}(€[\d,.\s]+).{0,50}', content[:10000])
|
|
for i, match in enumerate(bid_matches[:5], 1):
|
|
print(f" {i}. {match}")
|
|
|
|
# Look for location content
|
|
print(f"\n2. Location patterns in content:")
|
|
loc_matches = re.findall(r'.{0,30}(Locatie|Location).{0,100}', content, re.IGNORECASE)
|
|
for i, match in enumerate(loc_matches[:5], 1):
|
|
print(f" {i}. ...{match}...")
|
|
|
|
# Look for JSON data
|
|
print(f"\n3. JSON/Script data containing auction info:")
|
|
json_patterns = [
|
|
r'"currentBid"[^,}]+',
|
|
r'"location"[^,}]+',
|
|
r'"price"[^,}]+',
|
|
r'"addressLocality"[^,}]+'
|
|
]
|
|
for pattern in json_patterns:
|
|
matches = re.findall(pattern, content[:50000], re.IGNORECASE)
|
|
if matches:
|
|
print(f" {pattern}: {matches[:3]}")
|
|
|
|
# Look for script tags with structured data
|
|
script_matches = re.findall(r'<script[^>]*type=["\']application/ld\+json["\'][^>]*>(.*?)</script>', content, re.DOTALL)
|
|
if script_matches:
|
|
print(f"\n4. Structured data (JSON-LD) found:")
|
|
for i, script in enumerate(script_matches[:2], 1):
|
|
try:
|
|
data = json.loads(script)
|
|
print(f" Script {i}: {json.dumps(data, indent=6)[:500]}...")
|
|
except:
|
|
print(f" Script {i}: {script[:300]}...")
|
|
|
|
|
|
def main():
|
|
"""Main execution"""
|
|
import sys
|
|
|
|
# Check for test mode
|
|
if len(sys.argv) > 1 and sys.argv[1] == "--test":
|
|
test_url = sys.argv[2] if len(sys.argv) > 2 else None
|
|
if test_url:
|
|
test_extraction(test_url)
|
|
else:
|
|
test_extraction()
|
|
return
|
|
|
|
print("Troostwijk Auctions Scraper")
|
|
print("=" * 60)
|
|
print(f"Rate limit: {RATE_LIMIT_SECONDS} seconds BETWEEN EVERY REQUEST")
|
|
print(f"Cache database: {CACHE_DB}")
|
|
print(f"Output directory: {OUTPUT_DIR}")
|
|
print(f"Max listing pages: {MAX_PAGES}")
|
|
print("=" * 60)
|
|
|
|
scraper = TroostwijkScraper()
|
|
|
|
try:
|
|
# Clear old cache (older than 7 days) - KEEP DATABASE CLEAN
|
|
scraper.cache.clear_old(max_age_hours=168)
|
|
|
|
# Run the crawler
|
|
results = asyncio.run(scraper.crawl_auctions(max_pages=MAX_PAGES))
|
|
|
|
# Save final results
|
|
if results:
|
|
json_file, csv_file = scraper.save_final_results(results)
|
|
|
|
print("\n" + "="*60)
|
|
print("CRAWLING COMPLETED SUCCESSFULLY")
|
|
print("="*60)
|
|
print(f"Total lots scraped: {len(results)}")
|
|
print(f"JSON file: {json_file}")
|
|
print(f"CSV file: {csv_file}")
|
|
|
|
# Show sample
|
|
if results:
|
|
print(f"\n{'='*60}")
|
|
print("SAMPLE DATA:")
|
|
print(f"{'='*60}")
|
|
sample = results[0]
|
|
for key, value in sample.items():
|
|
if key != 'images':
|
|
print(f"{key:.<20}: {str(value)[:80]}...")
|
|
else:
|
|
print("\nNo results collected. Check cache and logs.")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nScraping interrupted by user - partial results saved in output directory")
|
|
except Exception as e:
|
|
print(f"\nERROR during scraping: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |