#!/usr/bin/env python3 """ Troostwijk Auctions Scraper Focuses on extracting auction lots with caching and rate limiting """ import asyncio import json import csv import re import sqlite3 import time from datetime import datetime from urllib.parse import urljoin, urlparse from pathlib import Path from typing import List, Dict, Optional, Set import random # Import Playwright - REQUIRED for bypassing Cloudflare from playwright.async_api import async_playwright, Browser, Page # ==================== CONFIGURATION ==================== BASE_URL = "https://www.troostwijkauctions.com" CACHE_DB = "/mnt/okcomputer/output/cache.db" OUTPUT_DIR = "/mnt/okcomputer/output" RATE_LIMIT_SECONDS = 0.5 # EXACTLY 0.5 seconds between requests - YOUR REQUIREMENT MAX_PAGES = 50 # Number of listing pages to crawl (adjust as needed) # Setup directories Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True) class CacheManager: """Manages page caching using SQLite - EVERY PAGE IS CACHED""" def __init__(self, db_path: str): self.db_path = db_path self._init_db() def _init_db(self): """Initialize cache database""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS cache ( url TEXT PRIMARY KEY, content TEXT, timestamp REAL, status_code INTEGER ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_timestamp ON cache(timestamp) """) conn.commit() def get(self, url: str, max_age_hours: int = 24) -> Optional[Dict]: """Get cached page if it exists and is not too old""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( "SELECT content, timestamp, status_code FROM cache WHERE url = ?", (url,) ) row = cursor.fetchone() if row: content, timestamp, status_code = row age_hours = (time.time() - timestamp) / 3600 if age_hours <= max_age_hours: return { 'content': content, 'timestamp': timestamp, 'status_code': status_code, 'cached': True } return None def set(self, url: str, content: str, status_code: int = 200): """Cache a page - EVERY SUCCESSFUL REQUEST IS CACHED""" with sqlite3.connect(self.db_path) as conn: conn.execute( "INSERT OR REPLACE INTO cache (url, content, timestamp, status_code) VALUES (?, ?, ?, ?)", (url, content, time.time(), status_code) ) conn.commit() print(f" → Cached: {url}") def clear_old(self, max_age_hours: int = 168): # Default: 1 week """Clear old cache entries to prevent database bloat""" cutoff_time = time.time() - (max_age_hours * 3600) with sqlite3.connect(self.db_path) as conn: deleted = conn.execute("DELETE FROM cache WHERE timestamp < ?", (cutoff_time,)).rowcount conn.commit() if deleted > 0: print(f" → Cleared {deleted} old cache entries") class TroostwijkScraper: """Main scraper class for Troostwijk Auctions""" def __init__(self): self.base_url = BASE_URL self.cache = CacheManager(CACHE_DB) self.visited_lots: Set[str] = set() self.output_data: List[Dict] = [] self.last_request_time = 0 async def _rate_limit(self): """ENSURE EXACTLY 0.5s BETWEEN REQUESTS - YOUR REQUIREMENT""" current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < RATE_LIMIT_SECONDS: delay = RATE_LIMIT_SECONDS - time_since_last await asyncio.sleep(delay) self.last_request_time = time.time() async def _get_page(self, page: Page, url: str, use_cache: bool = True) -> Optional[str]: """Get page content with caching and strict rate limiting""" # Check cache first - AVOID UNNECESSARY REQUESTS if use_cache: cached = self.cache.get(url) if cached: print(f" CACHE HIT: {url}") return cached['content'] # Rate limit before making request - YOUR 0.5s REQUIREMENT await self._rate_limit() try: print(f" FETCHING: {url}") await page.goto(url, wait_until='networkidle', timeout=30000) # Small additional wait for dynamic content await asyncio.sleep(random.uniform(0.3, 0.7)) content = await page.content() # Cache the successful result self.cache.set(url, content, 200) return content except Exception as e: print(f" ERROR: {e}") # Cache the error to avoid retrying too soon self.cache.set(url, "", 500) return None def _extract_lot_urls_from_listing(self, content: str) -> List[str]: """Extract lot URLs from auction listing page""" # Pattern matches /lots/ followed by digits pattern = r'href=["\']([/]a/[^"\']+)["\']' matches = re.findall(pattern, content, re.IGNORECASE) lot_urls = [] for match in matches: full_url = urljoin(self.base_url, match) lot_urls.append(full_url) # Remove duplicates return list(set(lot_urls)) def _extract_lot_id(self, url: str) -> str: """Extract lot ID from URL""" path = urlparse(url).path # Try /lots/ pattern first (legacy) match = re.search(r'/lots/(\d+)', path) if match: return match.group(1) # Try /a/ pattern (current format: /a/title-A7-12345) match = re.search(r'/a/.*?([A-Z]\d+-\d+)', path) if match: return match.group(1) # Fallback: return last part of path return path.split('/')[-1] if path else "" def _parse_lot_page(self, content: str, url: str) -> Dict: """Parse individual lot page and extract data""" # First try to extract from __NEXT_DATA__ JSON (Next.js sites) next_data = self._extract_nextjs_data(content) if next_data: return next_data # Fallback to HTML parsing content = re.sub(r'\s+', ' ', content) data = { 'url': url, 'lot_id': self._extract_lot_id(url), 'title': self._extract_meta_content(content, 'og:title'), 'current_bid': self._extract_current_bid(content), 'bid_count': self._extract_bid_count(content), 'end_date': self._extract_end_date(content), 'location': self._extract_location(content), 'description': self._extract_description(content), 'category': self._extract_category(content), 'images': self._extract_images(content), 'scraped_at': datetime.now().isoformat() } return data def _extract_nextjs_data(self, content: str) -> Optional[Dict]: """Extract data from Next.js __NEXT_DATA__ JSON""" try: # Find the __NEXT_DATA__ script tag match = re.search(r']*id="__NEXT_DATA__"[^>]*>(.+?)', content, re.DOTALL) if not match: return None data = json.loads(match.group(1)) # Navigate to pageProps page_props = data.get('props', {}).get('pageProps', {}) # Check if this is an auction page (contains lot data) if 'auction' in page_props: # This is a single lot/auction page auction = page_props.get('auction', {}) # Extract main data result = { 'url': self.base_url + '/a/' + auction.get('urlSlug', ''), 'lot_id': auction.get('displayId', ''), 'title': auction.get('name', ''), 'current_bid': '', # Need to check if this has bid info 'bid_count': 0, 'end_date': self._format_timestamp(auction.get('minEndDate', '')), 'location': self._extract_location_from_json(auction), 'description': auction.get('description', ''), 'category': auction.get('category', {}).get('name', '') if isinstance(auction.get('category'), dict) else '', 'images': [auction['image']['url']] if auction.get('image') and auction['image'].get('url') else [], 'scraped_at': datetime.now().isoformat() } return result return None except Exception as e: print(f" → Error parsing __NEXT_DATA__: {e}") return None def _format_timestamp(self, timestamp: any) -> str: """Convert Unix timestamp to readable date""" try: if isinstance(timestamp, (int, float)) and timestamp > 0: return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') return str(timestamp) if timestamp else '' except: return str(timestamp) if timestamp else '' def _extract_location_from_json(self, auction_data: Dict) -> str: """Extract location from auction JSON data""" # Try viewingDays first viewing_days = auction_data.get('viewingDays', []) if viewing_days and len(viewing_days) > 0: first_location = viewing_days[0] city = first_location.get('city', '') country = first_location.get('countryCode', '').upper() if city: return f"{city}, {country}" if country else city # Try collectionDays collection_days = auction_data.get('collectionDays', []) if collection_days and len(collection_days) > 0: first_location = collection_days[0] city = first_location.get('city', '') country = first_location.get('countryCode', '').upper() if city: return f"{city}, {country}" if country else city return '' def _extract_meta_content(self, content: str, property_name: str) -> str: """Extract content from meta tags""" pattern = rf']*property=["\']{property_name}["\'][^>]*content=["\']([^"\']+)["\']' match = re.search(pattern, content, re.IGNORECASE) if match: return self._clean_text(match.group(1)) return "" def _extract_current_bid(self, content: str) -> str: """Extract current bid amount""" patterns = [ # JSON data patterns (most reliable) r'"currentBid"\s*:\s*"([^"]+)"', r'"currentBid"\s*:\s*(\d+(?:\.\d+)?)', r'currentBid["\']?\s*:\s*["\']?([€\d,.\s]+)["\']?', # HTML patterns - look for bid amount AFTER the label r'(?:Current bid|Huidig bod)[:\s]*\s*(€[\d,.\s]+)', r'(?:Current bid|Huidig bod)[:\s]+(€[\d,.\s]+)', r'<[^>]*bid-amount[^>]*>[\s]*(€[\d,.\s]+)', # Meta tags r']*property=["\']auction:currentBid["\'][^>]*content=["\']([^"\']+)["\']', # Structured data r'"price"\s*:\s*"([€\d,.\s]+)"', ] for pattern in patterns: match = re.search(pattern, content, re.IGNORECASE | re.DOTALL) if match: bid = match.group(1).strip() # Validate it's not just the label if bid and bid.lower() not in ['huidig bod', 'current bid', 'locatie', 'location']: # Clean up the bid value if not bid.startswith('€'): bid = f"€{bid}" return bid return "€0" def _extract_bid_count(self, content: str) -> int: """Extract number of bids""" patterns = [ r'(\d+)\s*bids?', r'bidCount["\']:\s*["\']?(\d+)["\']?' ] for pattern in patterns: match = re.search(pattern, content, re.IGNORECASE) if match: try: return int(match.group(1)) except: return 0 return 0 def _extract_end_date(self, content: str) -> str: """Extract auction end date""" patterns = [ r'Ends?[:\s]+([A-Za-z0-9,:\s]+)', r'endTime["\']:\s*["\']([^"\']+)["\']', r'class="[^"]*end[^"]*".*?>([A-Za-z0-9,:\s]+)<' ] for pattern in patterns: match = re.search(pattern, content, re.IGNORECASE) if match: return match.group(1).strip() return "" def _extract_location(self, content: str) -> str: """Extract location""" patterns = [ # JSON data patterns (most reliable) r'"location"\s*:\s*"([^"]+)"', r'"address"\s*:\s*"([^"]+)"', r'"addressLocality"\s*:\s*"([^"]+)"', # HTML patterns - look for location AFTER the label r'(?:Location|Locatie)[:\s]*\s*([A-Za-zÀ-ÿ0-9\s,.-]+?)(?:<|$)', r'(?:Location|Locatie)[:\s]+([A-Za-zÀ-ÿ0-9\s,.-]+?)(?:]*location[^>]*>[\s]*([A-Za-zÀ-ÿ0-9\s,.-]+?)]*>', # Icon or label based r']*location[^>]*>\s*([A-Za-zÀ-ÿ0-9\s,.-]+)', # Meta tags r']*property=["\']auction:location["\'][^>]*content=["\']([^"\']+)["\']', ] for pattern in patterns: match = re.search(pattern, content, re.IGNORECASE | re.DOTALL) if match: location = self._clean_text(match.group(1)) # Validate it's not just the label if location and location.lower() not in ['locatie', 'location', 'huidig bod', 'current bid']: # Remove trailing punctuation and whitespace location = re.sub(r'[,.\s]+$', '', location) if len(location) > 2: # Must be more than 2 chars return location return "" def _extract_description(self, content: str) -> str: """Extract description""" patterns = [ r']*name=["\']description["\'][^>]*content=["\']([^"\']+)["\']', r'class="[^"]*description[^"]*".*?>([^<]+)<' ] for pattern in patterns: match = re.search(pattern, content, re.IGNORECASE | re.DOTALL) if match: return self._clean_text(match.group(1))[:500] return "" def _extract_category(self, content: str) -> str: """Extract category from breadcrumb or meta tags""" # Try breadcrumb first pattern = r'class="breadcrumb[^"]*".*?>([A-Za-z\s]+)' match = re.search(pattern, content, re.IGNORECASE) if match: return self._clean_text(match.group(1)) # Try meta return self._extract_meta_content(content, 'category') def _extract_images(self, content: str) -> List[str]: """Extract image URLs""" pattern = r']*src=["\']([^"\']+\.jpe?g|[^"\']+\.png)["\'][^>]*>' matches = re.findall(pattern, content, re.IGNORECASE) images = [] for match in matches: if any(skip in match.lower() for skip in ['logo', 'icon', 'placeholder', 'banner']): continue full_url = urljoin(self.base_url, match) images.append(full_url) return images[:5] # Limit to 5 images def _clean_text(self, text: str) -> str: """Clean extracted text""" import html text = html.unescape(text) text = re.sub(r'\s+', ' ', text) return text.strip() async def crawl_listing_page(self, page: Page, page_num: int) -> List[str]: """Crawl a single listing page and return lot URLs""" url = f"{self.base_url}/auctions?page={page_num}" print(f"\n{'='*60}") print(f"LISTING PAGE {page_num}: {url}") print(f"{'='*60}") content = await self._get_page(page, url) if not content: return [] lot_urls = self._extract_lot_urls_from_listing(content) print(f"→ Found {len(lot_urls)} lot URLs") return lot_urls async def crawl_lot(self, page: Page, url: str) -> Optional[Dict]: """Crawl an individual lot page""" if url in self.visited_lots: print(f" → Skipping (already visited): {url}") return None lot_id = self._extract_lot_id(url) print(f"\n[LOT {lot_id}]") content = await self._get_page(page, url) if not content: return None lot_data = self._parse_lot_page(content, url) self.visited_lots.add(url) print(f" → Title: {lot_data.get('title', 'N/A')[:60]}...") print(f" → Bid: {lot_data.get('current_bid', 'N/A')}") print(f" → Location: {lot_data.get('location', 'N/A')}") return lot_data async def crawl_auctions(self, max_pages: int = MAX_PAGES) -> List[Dict]: """Main crawl function""" async with async_playwright() as p: print("Launching browser...") browser = await p.chromium.launch( headless=True, args=[ '--no-sandbox', '--disable-setuid-sandbox', '--disable-blink-features=AutomationControlled' ] ) page = await browser.new_page( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36' ) # Set extra headers await page.set_extra_http_headers({ 'Accept-Language': 'en-US,en;q=0.9', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' }) all_lot_urls = [] # First pass: collect all lot URLs from listing pages print("\n" + "="*60) print("PHASE 1: COLLECTING LOT URLs FROM LISTING PAGES") print("="*60) for page_num in range(1, max_pages + 1): lot_urls = await self.crawl_listing_page(page, page_num) if not lot_urls: print(f"No lots found on page {page_num}, stopping") break all_lot_urls.extend(lot_urls) print(f" → Total lots collected so far: {len(all_lot_urls)}") # Remove duplicates all_lot_urls = list(set(all_lot_urls)) print(f"\n{'='*60}") print(f"PHASE 1 COMPLETE: {len(all_lot_urls)} UNIQUE LOTS TO SCRAPE") print(f"{'='*60}") # Second pass: scrape each lot page print("\n" + "="*60) print("PHASE 2: SCRAPING INDIVIDUAL LOT PAGES") print("="*60) results = [] for i, lot_url in enumerate(all_lot_urls): print(f"\n[{i+1:>3}/{len(all_lot_urls)}] ", end="") lot_data = await self.crawl_lot(page, lot_url) if lot_data: results.append(lot_data) # Save progress after each successful scrape if (i + 1) % 10 == 0: # Save every 10 lots self._save_intermediate(results) await browser.close() return results def _save_intermediate(self, data: List[Dict]): """Save intermediate results""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{OUTPUT_DIR}/troostwijk_lots_partial_{timestamp}.json" with open(filename, 'w', encoding='utf-8') as f: json.dump({ 'count': len(data), 'lots': data }, f, indent=2, ensure_ascii=False) print(f"\n → PROGRESS SAVED: {filename}") def save_final_results(self, data: List[Dict]): """Save final results in multiple formats""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Save JSON json_file = f"{OUTPUT_DIR}/troostwijk_lots_final_{timestamp}.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump({ 'count': len(data), 'scraped_at': datetime.now().isoformat(), 'rate_limit_seconds': RATE_LIMIT_SECONDS, 'lots': data }, f, indent=2, ensure_ascii=False) # Save CSV csv_file = f"{OUTPUT_DIR}/troostwijk_lots_final_{timestamp}.csv" if data: flat_data = [] for item in data: flat_item = item.copy() flat_item['images'] = ', '.join(flat_item.get('images', [])) flat_data.append(flat_item) with open(csv_file, 'w', newline='', encoding='utf-8') as f: fieldnames = ['url', 'lot_id', 'title', 'current_bid', 'bid_count', 'end_date', 'location', 'description', 'category', 'images', 'scraped_at'] writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') writer.writeheader() writer.writerows(flat_data) return json_file, csv_file def test_extraction(test_url: str = "https://www.troostwijkauctions.com/a/machines-en-toebehoren-%28hout-en-kunststofverwerking-handlingapparatuur-bouwmachines-landbouwindustrie%29-oost-europa-december-A7-35847"): """Test extraction on a specific cached URL to debug patterns""" scraper = TroostwijkScraper() # Try to get from cache cached = scraper.cache.get(test_url) if not cached: print(f"ERROR: URL not found in cache: {test_url}") print(f"\nAvailable cached URLs:") with sqlite3.connect(CACHE_DB) as conn: cursor = conn.execute("SELECT url FROM cache ORDER BY timestamp DESC LIMIT 10") for row in cursor.fetchall(): print(f" - {row[0]}") return content = cached['content'] print(f"\n{'='*60}") print(f"TESTING EXTRACTION FROM: {test_url}") print(f"{'='*60}") print(f"Content length: {len(content)} chars") print(f"Cache age: {(time.time() - cached['timestamp']) / 3600:.1f} hours") # Test each extraction method lot_data = scraper._parse_lot_page(content, test_url) print(f"\n{'='*60}") print("EXTRACTED DATA:") print(f"{'='*60}") for key, value in lot_data.items(): if key == 'images': print(f"{key:.<20}: {len(value)} images") for img in value[:3]: print(f"{'':.<20} - {img}") else: display_value = str(value)[:100] if value else "(empty)" # Handle Unicode characters that Windows console can't display try: print(f"{key:.<20}: {display_value}") except UnicodeEncodeError: safe_value = display_value.encode('ascii', 'replace').decode('ascii') print(f"{key:.<20}: {safe_value}") # Validation checks print(f"\n{'='*60}") print("VALIDATION CHECKS:") print(f"{'='*60}") issues = [] if lot_data['current_bid'] in ['Huidig bod', 'Current bid', '€0', '']: issues.append("[!] Current bid not extracted correctly") else: print("[OK] Current bid looks valid:", lot_data['current_bid']) if lot_data['location'] in ['Locatie', 'Location', '']: issues.append("[!] Location not extracted correctly") else: print("[OK] Location looks valid:", lot_data['location']) if lot_data['title'] in ['', '...']: issues.append("[!] Title not extracted correctly") else: print("[OK] Title looks valid:", lot_data['title'][:50]) if issues: print(f"\n[ISSUES FOUND]") for issue in issues: print(f" {issue}") else: print(f"\n[ALL FIELDS EXTRACTED SUCCESSFULLY!]") # Debug: Show raw HTML snippets for problematic fields print(f"\n{'='*60}") print("DEBUG: RAW HTML SNIPPETS") print(f"{'='*60}") # Look for bid-related content print(f"\n1. Bid patterns in content:") bid_matches = re.findall(r'.{0,50}(€[\d,.\s]+).{0,50}', content[:10000]) for i, match in enumerate(bid_matches[:5], 1): print(f" {i}. {match}") # Look for location content print(f"\n2. Location patterns in content:") loc_matches = re.findall(r'.{0,30}(Locatie|Location).{0,100}', content, re.IGNORECASE) for i, match in enumerate(loc_matches[:5], 1): print(f" {i}. ...{match}...") # Look for JSON data print(f"\n3. JSON/Script data containing auction info:") json_patterns = [ r'"currentBid"[^,}]+', r'"location"[^,}]+', r'"price"[^,}]+', r'"addressLocality"[^,}]+' ] for pattern in json_patterns: matches = re.findall(pattern, content[:50000], re.IGNORECASE) if matches: print(f" {pattern}: {matches[:3]}") # Look for script tags with structured data script_matches = re.findall(r']*type=["\']application/ld\+json["\'][^>]*>(.*?)', content, re.DOTALL) if script_matches: print(f"\n4. Structured data (JSON-LD) found:") for i, script in enumerate(script_matches[:2], 1): try: data = json.loads(script) print(f" Script {i}: {json.dumps(data, indent=6)[:500]}...") except: print(f" Script {i}: {script[:300]}...") def main(): """Main execution""" import sys # Check for test mode if len(sys.argv) > 1 and sys.argv[1] == "--test": test_url = sys.argv[2] if len(sys.argv) > 2 else None if test_url: test_extraction(test_url) else: test_extraction() return print("Troostwijk Auctions Scraper") print("=" * 60) print(f"Rate limit: {RATE_LIMIT_SECONDS} seconds BETWEEN EVERY REQUEST") print(f"Cache database: {CACHE_DB}") print(f"Output directory: {OUTPUT_DIR}") print(f"Max listing pages: {MAX_PAGES}") print("=" * 60) scraper = TroostwijkScraper() try: # Clear old cache (older than 7 days) - KEEP DATABASE CLEAN scraper.cache.clear_old(max_age_hours=168) # Run the crawler results = asyncio.run(scraper.crawl_auctions(max_pages=MAX_PAGES)) # Save final results if results: json_file, csv_file = scraper.save_final_results(results) print("\n" + "="*60) print("CRAWLING COMPLETED SUCCESSFULLY") print("="*60) print(f"Total lots scraped: {len(results)}") print(f"JSON file: {json_file}") print(f"CSV file: {csv_file}") # Show sample if results: print(f"\n{'='*60}") print("SAMPLE DATA:") print(f"{'='*60}") sample = results[0] for key, value in sample.items(): if key != 'images': print(f"{key:.<20}: {str(value)[:80]}...") else: print("\nNo results collected. Check cache and logs.") except KeyboardInterrupt: print("\nScraping interrupted by user - partial results saved in output directory") except Exception as e: print(f"\nERROR during scraping: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()