#!/usr/bin/env python3 """ Parser module for extracting data from HTML/JSON content """ import json import re import html from datetime import datetime from urllib.parse import urljoin, urlparse from typing import Dict, List, Optional from config import BASE_URL class DataParser: """Handles all data extraction from HTML/JSON content""" @staticmethod def extract_lot_id(url: str) -> str: """Extract lot ID from URL""" path = urlparse(url).path match = re.search(r'/lots/(\d+)', path) if match: return match.group(1) match = re.search(r'/a/.*?([A-Z]\d+-\d+)', path) if match: return match.group(1) return path.split('/')[-1] if path else "" @staticmethod def clean_text(text: str) -> str: """Clean extracted text""" text = html.unescape(text) text = re.sub(r'\s+', ' ', text) return text.strip() @staticmethod def format_timestamp(timestamp) -> str: """Convert Unix timestamp to readable date""" try: # Handle numeric timestamps if isinstance(timestamp, (int, float)) and timestamp > 0: # Unix timestamps are typically 10 digits (seconds) or 13 digits (milliseconds) if timestamp > 1e12: # Milliseconds timestamp = timestamp / 1000 return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') # Handle string timestamps that might be numeric if isinstance(timestamp, str): # Try to parse as number try: ts_num = float(timestamp) if ts_num > 1e12: ts_num = ts_num / 1000 if ts_num > 0: return datetime.fromtimestamp(ts_num).strftime('%Y-%m-%d %H:%M:%S') except ValueError: # Not a numeric string - check if it's an invalid value invalid_values = ['gap', 'materieel wegens vereffening', 'tbd', 'n/a', 'unknown'] if timestamp.lower().strip() in invalid_values: return '' # Return as-is if it looks like a formatted date return timestamp if len(timestamp) > 0 else '' return str(timestamp) if timestamp else '' except Exception as e: # Log parsing errors for debugging if timestamp and str(timestamp).strip(): print(f" ⚠️ Could not parse timestamp: {timestamp}") return '' @staticmethod def format_currency(amount) -> str: """Format currency amount""" if isinstance(amount, (int, float)): return f"€{amount:,.2f}" if amount > 0 else "€0" return str(amount) if amount else "€0" def parse_page(self, content: str, url: str) -> Optional[Dict]: """Parse page and determine if it's an auction or lot""" next_data = self._extract_nextjs_data(content, url) if next_data: return next_data content = re.sub(r'\s+', ' ', content) return { 'type': 'lot', 'url': url, 'lot_id': self.extract_lot_id(url), 'title': self._extract_meta_content(content, 'og:title'), 'current_bid': self._extract_current_bid(content), 'bid_count': self._extract_bid_count(content), 'closing_time': self._extract_end_date(content), 'location': self._extract_location(content), 'description': self._extract_description(content), 'category': self._extract_category(content), 'images': self._extract_images(content), 'scraped_at': datetime.now().isoformat() } def _extract_nextjs_data(self, content: str, url: str) -> Optional[Dict]: """Extract data from Next.js __NEXT_DATA__ JSON""" try: match = re.search(r']*id="__NEXT_DATA__"[^>]*>(.+?)', content, re.DOTALL) if not match: return None data = json.loads(match.group(1)) page_props = data.get('props', {}).get('pageProps', {}) if 'lot' in page_props: # Pass both lot and auction data (auction is included in lot pages) return self._parse_lot_json(page_props.get('lot', {}), url, page_props.get('auction')) if 'auction' in page_props: return self._parse_auction_json(page_props.get('auction', {}), url) return None except Exception as e: print(f" → Error parsing __NEXT_DATA__: {e}") return None def _parse_lot_json(self, lot_data: Dict, url: str, auction_data: Optional[Dict] = None) -> Dict: """Parse lot data from JSON Args: lot_data: Lot object from __NEXT_DATA__ url: Page URL auction_data: Optional auction object (included in lot pages) """ location_data = lot_data.get('location', {}) city = location_data.get('city', '') country = location_data.get('countryCode', '').upper() location = f"{city}, {country}" if city and country else (city or country) current_bid = lot_data.get('currentBid') or lot_data.get('highestBid') or lot_data.get('startingBid') if current_bid is None or current_bid == 0: bidding = lot_data.get('bidding', {}) current_bid = bidding.get('currentBid') or bidding.get('amount') current_bid_str = self.format_currency(current_bid) if current_bid and current_bid > 0 else "No bids" bid_count = lot_data.get('bidCount', 0) if bid_count == 0: bid_count = lot_data.get('bidding', {}).get('bidCount', 0) description = lot_data.get('description', {}) if isinstance(description, dict): description = description.get('description', '') else: description = str(description) category = lot_data.get('category', {}) category_name = category.get('name', '') if isinstance(category, dict) else '' # Get auction displayId from auction data if available (lot pages include auction) # Otherwise fall back to the UUID auctionId auction_id = lot_data.get('auctionId', '') if auction_data and auction_data.get('displayId'): auction_id = auction_data.get('displayId') return { 'type': 'lot', 'lot_id': lot_data.get('displayId', ''), 'auction_id': auction_id, 'url': url, 'title': lot_data.get('title', ''), 'current_bid': current_bid_str, 'bid_count': bid_count, 'closing_time': self.format_timestamp(lot_data.get('endDate', '')), 'viewing_time': self._extract_viewing_time(lot_data), 'pickup_date': self._extract_pickup_date(lot_data), 'location': location, 'description': description, 'category': category_name, 'images': self._extract_images_from_json(lot_data), 'scraped_at': datetime.now().isoformat() } def _parse_auction_json(self, auction_data: Dict, url: str) -> Dict: """Parse auction data from JSON""" is_auction = 'lots' in auction_data and isinstance(auction_data['lots'], list) is_lot = 'lotNumber' in auction_data or 'currentBid' in auction_data if is_auction: lots = auction_data.get('lots', []) first_lot_closing = None if lots: first_lot_closing = self.format_timestamp(lots[0].get('endDate', '')) return { 'type': 'auction', 'auction_id': auction_data.get('displayId', ''), 'url': url, 'title': auction_data.get('name', ''), 'location': self._extract_location_from_json(auction_data), 'lots_count': len(lots), 'first_lot_closing_time': first_lot_closing or self.format_timestamp(auction_data.get('minEndDate', '')), 'scraped_at': datetime.now().isoformat(), 'lots': lots } elif is_lot: return self._parse_lot_json(auction_data, url) return None def _extract_viewing_time(self, auction_data: Dict) -> str: """Extract viewing time from auction data""" viewing_days = auction_data.get('viewingDays', []) if viewing_days: first = viewing_days[0] start = self.format_timestamp(first.get('startDate', '')) end = self.format_timestamp(first.get('endDate', '')) if start and end: return f"{start} - {end}" return start or end return '' def _extract_pickup_date(self, auction_data: Dict) -> str: """Extract pickup date from auction data""" collection_days = auction_data.get('collectionDays', []) if collection_days: first = collection_days[0] start = self.format_timestamp(first.get('startDate', '')) end = self.format_timestamp(first.get('endDate', '')) if start and end: return f"{start} - {end}" return start or end return '' def _extract_images_from_json(self, auction_data: Dict) -> List[str]: """Extract all image URLs from auction data""" images = [] if auction_data.get('image', {}).get('url'): images.append(auction_data['image']['url']) if isinstance(auction_data.get('images'), list): for img in auction_data['images']: if isinstance(img, dict) and img.get('url'): images.append(img['url']) elif isinstance(img, str): images.append(img) return images def _extract_location_from_json(self, auction_data: Dict) -> str: """Extract location from auction JSON data""" for days in [auction_data.get('viewingDays', []), auction_data.get('collectionDays', [])]: if days: first_location = days[0] city = first_location.get('city', '') country = first_location.get('countryCode', '').upper() if city: return f"{city}, {country}" if country else city return '' def _extract_meta_content(self, content: str, property_name: str) -> str: """Extract content from meta tags""" pattern = rf']*property=["\']{property_name}["\'][^>]*content=["\']([^"\']+)["\']' match = re.search(pattern, content, re.IGNORECASE) return self.clean_text(match.group(1)) if match else "" def _extract_current_bid(self, content: str) -> str: """Extract current bid amount""" patterns = [ r'"currentBid"\s*:\s*"([^"]+)"', r'"currentBid"\s*:\s*(\d+(?:\.\d+)?)', r'(?:Current bid|Huidig bod)[:\s]*\s*(€[\d,.\s]+)', r'(?:Current bid|Huidig bod)[:\s]+(€[\d,.\s]+)', ] # Invalid bid texts that should be treated as "no bids" invalid_bid_texts = [ 'huidig bod', 'current bid', '€huidig bod', '€huidig ​​bod', # With zero-width spaces 'huidig ​​bod', ] for pattern in patterns: match = re.search(pattern, content, re.IGNORECASE) if match: bid = match.group(1).strip() # Remove zero-width spaces and other unicode whitespace bid = re.sub(r'[\u200b\u200c\u200d\u00a0]+', ' ', bid).strip() # Check if it's a valid bid if bid: # Reject invalid bid texts bid_lower = bid.lower().replace(' ', '').replace('€', '') if bid_lower not in [t.lower().replace(' ', '').replace('€', '') for t in invalid_bid_texts]: if not bid.startswith('€'): bid = f"€{bid}" return bid return "No bids" def _extract_bid_count(self, content: str) -> int: """Extract number of bids""" match = re.search(r'(\d+)\s*bids?', content, re.IGNORECASE) if match: try: return int(match.group(1)) except: pass return 0 def _extract_end_date(self, content: str) -> str: """Extract auction end date""" patterns = [ r'Ends?[:\s]+([A-Za-z0-9,:\s]+)', r'endTime["\']:\s*["\']([^"\']+)["\']', ] for pattern in patterns: match = re.search(pattern, content, re.IGNORECASE) if match: return match.group(1).strip() return "" def _extract_location(self, content: str) -> str: """Extract location""" patterns = [ r'(?:Location|Locatie)[:\s]*\s*([A-Za-zÀ-ÿ0-9\s,.-]+?)(?:<|$)', r'(?:Location|Locatie)[:\s]+([A-Za-zÀ-ÿ0-9\s,.-]+?)(?: 2: return location return "" def _extract_description(self, content: str) -> str: """Extract description""" pattern = r']*name=["\']description["\'][^>]*content=["\']([^"\']+)["\']' match = re.search(pattern, content, re.IGNORECASE | re.DOTALL) return self.clean_text(match.group(1))[:500] if match else "" def _extract_category(self, content: str) -> str: """Extract category from breadcrumb or meta tags""" pattern = r'class="breadcrumb[^"]*".*?>([A-Za-z\s]+)' match = re.search(pattern, content, re.IGNORECASE) if match: return self.clean_text(match.group(1)) return self._extract_meta_content(content, 'category') def _extract_images(self, content: str) -> List[str]: """Extract image URLs""" pattern = r']*src=["\']([^"\']+\.jpe?g|[^"\']+\.png)["\'][^>]*>' matches = re.findall(pattern, content, re.IGNORECASE) images = [] for match in matches: if any(skip in match.lower() for skip in ['logo', 'icon', 'placeholder', 'banner']): continue full_url = urljoin(BASE_URL, match) images.append(full_url) return images[:5] # Limit to 5 images