Files
scaev/src/parse.py
2025-12-07 02:20:14 +01:00

359 lines
15 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Parser module for extracting data from HTML/JSON content
"""
import json
import re
import html
from datetime import datetime
from urllib.parse import urljoin, urlparse
from typing import Dict, List, Optional
from config import BASE_URL
class DataParser:
"""Handles all data extraction from HTML/JSON content"""
@staticmethod
def extract_lot_id(url: str) -> str:
"""Extract lot ID from URL"""
path = urlparse(url).path
match = re.search(r'/lots/(\d+)', path)
if match:
return match.group(1)
match = re.search(r'/a/.*?([A-Z]\d+-\d+)', path)
if match:
return match.group(1)
return path.split('/')[-1] if path else ""
@staticmethod
def clean_text(text: str) -> str:
"""Clean extracted text"""
text = html.unescape(text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
@staticmethod
def format_timestamp(timestamp) -> str:
"""Convert Unix timestamp to readable date"""
try:
# Handle numeric timestamps
if isinstance(timestamp, (int, float)) and timestamp > 0:
# Unix timestamps are typically 10 digits (seconds) or 13 digits (milliseconds)
if timestamp > 1e12: # Milliseconds
timestamp = timestamp / 1000
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
# Handle string timestamps that might be numeric
if isinstance(timestamp, str):
# Try to parse as number
try:
ts_num = float(timestamp)
if ts_num > 1e12:
ts_num = ts_num / 1000
if ts_num > 0:
return datetime.fromtimestamp(ts_num).strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# Not a numeric string - check if it's an invalid value
invalid_values = ['gap', 'materieel wegens vereffening', 'tbd', 'n/a', 'unknown']
if timestamp.lower().strip() in invalid_values:
return ''
# Return as-is if it looks like a formatted date
return timestamp if len(timestamp) > 0 else ''
return str(timestamp) if timestamp else ''
except Exception as e:
# Log parsing errors for debugging
if timestamp and str(timestamp).strip():
print(f" ⚠️ Could not parse timestamp: {timestamp}")
return ''
@staticmethod
def format_currency(amount) -> str:
"""Format currency amount"""
if isinstance(amount, (int, float)):
return f"{amount:,.2f}" if amount > 0 else "€0"
return str(amount) if amount else "€0"
def parse_page(self, content: str, url: str) -> Optional[Dict]:
"""Parse page and determine if it's an auction or lot"""
next_data = self._extract_nextjs_data(content, url)
if next_data:
return next_data
content = re.sub(r'\s+', ' ', content)
return {
'type': 'lot',
'url': url,
'lot_id': self.extract_lot_id(url),
'title': self._extract_meta_content(content, 'og:title'),
'current_bid': self._extract_current_bid(content),
'bid_count': self._extract_bid_count(content),
'closing_time': self._extract_end_date(content),
'location': self._extract_location(content),
'description': self._extract_description(content),
'category': self._extract_category(content),
'images': self._extract_images(content),
'scraped_at': datetime.now().isoformat()
}
def _extract_nextjs_data(self, content: str, url: str) -> Optional[Dict]:
"""Extract data from Next.js __NEXT_DATA__ JSON"""
try:
match = re.search(r'<script[^>]*id="__NEXT_DATA__"[^>]*>(.+?)</script>', content, re.DOTALL)
if not match:
return None
data = json.loads(match.group(1))
page_props = data.get('props', {}).get('pageProps', {})
if 'lot' in page_props:
# Pass both lot and auction data (auction is included in lot pages)
return self._parse_lot_json(page_props.get('lot', {}), url, page_props.get('auction'))
if 'auction' in page_props:
return self._parse_auction_json(page_props.get('auction', {}), url)
return None
except Exception as e:
print(f" → Error parsing __NEXT_DATA__: {e}")
return None
def _parse_lot_json(self, lot_data: Dict, url: str, auction_data: Optional[Dict] = None) -> Dict:
"""Parse lot data from JSON
Args:
lot_data: Lot object from __NEXT_DATA__
url: Page URL
auction_data: Optional auction object (included in lot pages)
"""
location_data = lot_data.get('location', {})
city = location_data.get('city', '')
country = location_data.get('countryCode', '').upper()
location = f"{city}, {country}" if city and country else (city or country)
current_bid = lot_data.get('currentBid') or lot_data.get('highestBid') or lot_data.get('startingBid')
if current_bid is None or current_bid == 0:
bidding = lot_data.get('bidding', {})
current_bid = bidding.get('currentBid') or bidding.get('amount')
current_bid_str = self.format_currency(current_bid) if current_bid and current_bid > 0 else "No bids"
bid_count = lot_data.get('bidCount', 0)
if bid_count == 0:
bid_count = lot_data.get('bidding', {}).get('bidCount', 0)
description = lot_data.get('description', {})
if isinstance(description, dict):
description = description.get('description', '')
else:
description = str(description)
category = lot_data.get('category', {})
category_name = category.get('name', '') if isinstance(category, dict) else ''
# Get auction displayId from auction data if available (lot pages include auction)
# Otherwise fall back to the UUID auctionId
auction_id = lot_data.get('auctionId', '')
if auction_data and auction_data.get('displayId'):
auction_id = auction_data.get('displayId')
return {
'type': 'lot',
'lot_id': lot_data.get('displayId', ''),
'auction_id': auction_id,
'url': url,
'title': lot_data.get('title', ''),
'current_bid': current_bid_str,
'bid_count': bid_count,
'closing_time': self.format_timestamp(lot_data.get('endDate', '')),
'viewing_time': self._extract_viewing_time(lot_data),
'pickup_date': self._extract_pickup_date(lot_data),
'location': location,
'description': description,
'category': category_name,
'images': self._extract_images_from_json(lot_data),
'scraped_at': datetime.now().isoformat()
}
def _parse_auction_json(self, auction_data: Dict, url: str) -> Dict:
"""Parse auction data from JSON"""
is_auction = 'lots' in auction_data and isinstance(auction_data['lots'], list)
is_lot = 'lotNumber' in auction_data or 'currentBid' in auction_data
if is_auction:
lots = auction_data.get('lots', [])
first_lot_closing = None
if lots:
first_lot_closing = self.format_timestamp(lots[0].get('endDate', ''))
return {
'type': 'auction',
'auction_id': auction_data.get('displayId', ''),
'url': url,
'title': auction_data.get('name', ''),
'location': self._extract_location_from_json(auction_data),
'lots_count': len(lots),
'first_lot_closing_time': first_lot_closing or self.format_timestamp(auction_data.get('minEndDate', '')),
'scraped_at': datetime.now().isoformat(),
'lots': lots
}
elif is_lot:
return self._parse_lot_json(auction_data, url)
return None
def _extract_viewing_time(self, auction_data: Dict) -> str:
"""Extract viewing time from auction data"""
viewing_days = auction_data.get('viewingDays', [])
if viewing_days:
first = viewing_days[0]
start = self.format_timestamp(first.get('startDate', ''))
end = self.format_timestamp(first.get('endDate', ''))
if start and end:
return f"{start} - {end}"
return start or end
return ''
def _extract_pickup_date(self, auction_data: Dict) -> str:
"""Extract pickup date from auction data"""
collection_days = auction_data.get('collectionDays', [])
if collection_days:
first = collection_days[0]
start = self.format_timestamp(first.get('startDate', ''))
end = self.format_timestamp(first.get('endDate', ''))
if start and end:
return f"{start} - {end}"
return start or end
return ''
def _extract_images_from_json(self, auction_data: Dict) -> List[str]:
"""Extract all image URLs from auction data"""
images = []
if auction_data.get('image', {}).get('url'):
images.append(auction_data['image']['url'])
if isinstance(auction_data.get('images'), list):
for img in auction_data['images']:
if isinstance(img, dict) and img.get('url'):
images.append(img['url'])
elif isinstance(img, str):
images.append(img)
return images
def _extract_location_from_json(self, auction_data: Dict) -> str:
"""Extract location from auction JSON data"""
for days in [auction_data.get('viewingDays', []), auction_data.get('collectionDays', [])]:
if days:
first_location = days[0]
city = first_location.get('city', '')
country = first_location.get('countryCode', '').upper()
if city:
return f"{city}, {country}" if country else city
return ''
def _extract_meta_content(self, content: str, property_name: str) -> str:
"""Extract content from meta tags"""
pattern = rf'<meta[^>]*property=["\']{property_name}["\'][^>]*content=["\']([^"\']+)["\']'
match = re.search(pattern, content, re.IGNORECASE)
return self.clean_text(match.group(1)) if match else ""
def _extract_current_bid(self, content: str) -> str:
"""Extract current bid amount"""
patterns = [
r'"currentBid"\s*:\s*"([^"]+)"',
r'"currentBid"\s*:\s*(\d+(?:\.\d+)?)',
r'(?:Current bid|Huidig bod)[:\s]*</?\w*>\s*(€[\d,.\s]+)',
r'(?:Current bid|Huidig bod)[:\s]+(€[\d,.\s]+)',
]
# Invalid bid texts that should be treated as "no bids"
invalid_bid_texts = [
'huidig bod',
'current bid',
'€huidig bod',
'€huidig bod', # With zero-width spaces
'huidig bod',
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
bid = match.group(1).strip()
# Remove zero-width spaces and other unicode whitespace
bid = re.sub(r'[\u200b\u200c\u200d\u00a0]+', ' ', bid).strip()
# Check if it's a valid bid
if bid:
# Reject invalid bid texts
bid_lower = bid.lower().replace(' ', '').replace('', '')
if bid_lower not in [t.lower().replace(' ', '').replace('', '') for t in invalid_bid_texts]:
if not bid.startswith(''):
bid = f"{bid}"
return bid
return "No bids"
def _extract_bid_count(self, content: str) -> int:
"""Extract number of bids"""
match = re.search(r'(\d+)\s*bids?', content, re.IGNORECASE)
if match:
try:
return int(match.group(1))
except:
pass
return 0
def _extract_end_date(self, content: str) -> str:
"""Extract auction end date"""
patterns = [
r'Ends?[:\s]+([A-Za-z0-9,:\s]+)',
r'endTime["\']:\s*["\']([^"\']+)["\']',
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
return match.group(1).strip()
return ""
def _extract_location(self, content: str) -> str:
"""Extract location"""
patterns = [
r'(?:Location|Locatie)[:\s]*</?\w*>\s*([A-Za-zÀ-ÿ0-9\s,.-]+?)(?:<|$)',
r'(?:Location|Locatie)[:\s]+([A-Za-zÀ-ÿ0-9\s,.-]+?)(?:<br|</|$)',
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
location = self.clean_text(match.group(1))
if location.lower() not in ['locatie', 'location', 'huidig bod', 'current bid']:
location = re.sub(r'[,.\s]+$', '', location)
if len(location) > 2:
return location
return ""
def _extract_description(self, content: str) -> str:
"""Extract description"""
pattern = r'<meta[^>]*name=["\']description["\'][^>]*content=["\']([^"\']+)["\']'
match = re.search(pattern, content, re.IGNORECASE | re.DOTALL)
return self.clean_text(match.group(1))[:500] if match else ""
def _extract_category(self, content: str) -> str:
"""Extract category from breadcrumb or meta tags"""
pattern = r'class="breadcrumb[^"]*".*?>([A-Za-z\s]+)</a>'
match = re.search(pattern, content, re.IGNORECASE)
if match:
return self.clean_text(match.group(1))
return self._extract_meta_content(content, 'category')
def _extract_images(self, content: str) -> List[str]:
"""Extract image URLs"""
pattern = r'<img[^>]*src=["\']([^"\']+\.jpe?g|[^"\']+\.png)["\'][^>]*>'
matches = re.findall(pattern, content, re.IGNORECASE)
images = []
for match in matches:
if any(skip in match.lower() for skip in ['logo', 'icon', 'placeholder', 'banner']):
continue
full_url = urljoin(BASE_URL, match)
images.append(full_url)
return images[:5] # Limit to 5 images