This commit is contained in:
Tour
2025-12-04 14:49:58 +01:00
commit 79e14be37a
22 changed files with 2765 additions and 0 deletions

303
src/parse.py Normal file
View File

@@ -0,0 +1,303 @@
#!/usr/bin/env python3
"""
Parser module for extracting data from HTML/JSON content
"""
import json
import re
import html
from datetime import datetime
from urllib.parse import urljoin, urlparse
from typing import Dict, List, Optional
from config import BASE_URL
class DataParser:
"""Handles all data extraction from HTML/JSON content"""
@staticmethod
def extract_lot_id(url: str) -> str:
"""Extract lot ID from URL"""
path = urlparse(url).path
match = re.search(r'/lots/(\d+)', path)
if match:
return match.group(1)
match = re.search(r'/a/.*?([A-Z]\d+-\d+)', path)
if match:
return match.group(1)
return path.split('/')[-1] if path else ""
@staticmethod
def clean_text(text: str) -> str:
"""Clean extracted text"""
text = html.unescape(text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
@staticmethod
def format_timestamp(timestamp) -> str:
"""Convert Unix timestamp to readable date"""
try:
if isinstance(timestamp, (int, float)) and timestamp > 0:
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
return str(timestamp) if timestamp else ''
except:
return str(timestamp) if timestamp else ''
@staticmethod
def format_currency(amount) -> str:
"""Format currency amount"""
if isinstance(amount, (int, float)):
return f"{amount:,.2f}" if amount > 0 else "€0"
return str(amount) if amount else "€0"
def parse_page(self, content: str, url: str) -> Optional[Dict]:
"""Parse page and determine if it's an auction or lot"""
next_data = self._extract_nextjs_data(content, url)
if next_data:
return next_data
content = re.sub(r'\s+', ' ', content)
return {
'type': 'lot',
'url': url,
'lot_id': self.extract_lot_id(url),
'title': self._extract_meta_content(content, 'og:title'),
'current_bid': self._extract_current_bid(content),
'bid_count': self._extract_bid_count(content),
'closing_time': self._extract_end_date(content),
'location': self._extract_location(content),
'description': self._extract_description(content),
'category': self._extract_category(content),
'images': self._extract_images(content),
'scraped_at': datetime.now().isoformat()
}
def _extract_nextjs_data(self, content: str, url: str) -> Optional[Dict]:
"""Extract data from Next.js __NEXT_DATA__ JSON"""
try:
match = re.search(r'<script[^>]*id="__NEXT_DATA__"[^>]*>(.+?)</script>', content, re.DOTALL)
if not match:
return None
data = json.loads(match.group(1))
page_props = data.get('props', {}).get('pageProps', {})
if 'lot' in page_props:
return self._parse_lot_json(page_props.get('lot', {}), url)
if 'auction' in page_props:
return self._parse_auction_json(page_props.get('auction', {}), url)
return None
except Exception as e:
print(f" → Error parsing __NEXT_DATA__: {e}")
return None
def _parse_lot_json(self, lot_data: Dict, url: str) -> Dict:
"""Parse lot data from JSON"""
location_data = lot_data.get('location', {})
city = location_data.get('city', '')
country = location_data.get('countryCode', '').upper()
location = f"{city}, {country}" if city and country else (city or country)
current_bid = lot_data.get('currentBid') or lot_data.get('highestBid') or lot_data.get('startingBid')
if current_bid is None or current_bid == 0:
bidding = lot_data.get('bidding', {})
current_bid = bidding.get('currentBid') or bidding.get('amount')
current_bid_str = self.format_currency(current_bid) if current_bid and current_bid > 0 else "No bids"
bid_count = lot_data.get('bidCount', 0)
if bid_count == 0:
bid_count = lot_data.get('bidding', {}).get('bidCount', 0)
description = lot_data.get('description', {})
if isinstance(description, dict):
description = description.get('description', '')
else:
description = str(description)
category = lot_data.get('category', {})
category_name = category.get('name', '') if isinstance(category, dict) else ''
return {
'type': 'lot',
'lot_id': lot_data.get('displayId', ''),
'auction_id': lot_data.get('auctionId', ''),
'url': url,
'title': lot_data.get('title', ''),
'current_bid': current_bid_str,
'bid_count': bid_count,
'closing_time': self.format_timestamp(lot_data.get('endDate', '')),
'viewing_time': self._extract_viewing_time(lot_data),
'pickup_date': self._extract_pickup_date(lot_data),
'location': location,
'description': description,
'category': category_name,
'images': self._extract_images_from_json(lot_data),
'scraped_at': datetime.now().isoformat()
}
def _parse_auction_json(self, auction_data: Dict, url: str) -> Dict:
"""Parse auction data from JSON"""
is_auction = 'lots' in auction_data and isinstance(auction_data['lots'], list)
is_lot = 'lotNumber' in auction_data or 'currentBid' in auction_data
if is_auction:
lots = auction_data.get('lots', [])
first_lot_closing = None
if lots:
first_lot_closing = self.format_timestamp(lots[0].get('endDate', ''))
return {
'type': 'auction',
'auction_id': auction_data.get('displayId', ''),
'url': url,
'title': auction_data.get('name', ''),
'location': self._extract_location_from_json(auction_data),
'lots_count': len(lots),
'first_lot_closing_time': first_lot_closing or self.format_timestamp(auction_data.get('minEndDate', '')),
'scraped_at': datetime.now().isoformat(),
'lots': lots
}
elif is_lot:
return self._parse_lot_json(auction_data, url)
return None
def _extract_viewing_time(self, auction_data: Dict) -> str:
"""Extract viewing time from auction data"""
viewing_days = auction_data.get('viewingDays', [])
if viewing_days:
first = viewing_days[0]
start = self.format_timestamp(first.get('startDate', ''))
end = self.format_timestamp(first.get('endDate', ''))
if start and end:
return f"{start} - {end}"
return start or end
return ''
def _extract_pickup_date(self, auction_data: Dict) -> str:
"""Extract pickup date from auction data"""
collection_days = auction_data.get('collectionDays', [])
if collection_days:
first = collection_days[0]
start = self.format_timestamp(first.get('startDate', ''))
end = self.format_timestamp(first.get('endDate', ''))
if start and end:
return f"{start} - {end}"
return start or end
return ''
def _extract_images_from_json(self, auction_data: Dict) -> List[str]:
"""Extract all image URLs from auction data"""
images = []
if auction_data.get('image', {}).get('url'):
images.append(auction_data['image']['url'])
if isinstance(auction_data.get('images'), list):
for img in auction_data['images']:
if isinstance(img, dict) and img.get('url'):
images.append(img['url'])
elif isinstance(img, str):
images.append(img)
return images
def _extract_location_from_json(self, auction_data: Dict) -> str:
"""Extract location from auction JSON data"""
for days in [auction_data.get('viewingDays', []), auction_data.get('collectionDays', [])]:
if days:
first_location = days[0]
city = first_location.get('city', '')
country = first_location.get('countryCode', '').upper()
if city:
return f"{city}, {country}" if country else city
return ''
def _extract_meta_content(self, content: str, property_name: str) -> str:
"""Extract content from meta tags"""
pattern = rf'<meta[^>]*property=["\']{property_name}["\'][^>]*content=["\']([^"\']+)["\']'
match = re.search(pattern, content, re.IGNORECASE)
return self.clean_text(match.group(1)) if match else ""
def _extract_current_bid(self, content: str) -> str:
"""Extract current bid amount"""
patterns = [
r'"currentBid"\s*:\s*"([^"]+)"',
r'"currentBid"\s*:\s*(\d+(?:\.\d+)?)',
r'(?:Current bid|Huidig bod)[:\s]*</?\w*>\s*(€[\d,.\s]+)',
r'(?:Current bid|Huidig bod)[:\s]+(€[\d,.\s]+)',
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
bid = match.group(1).strip()
if bid and bid.lower() not in ['huidig bod', 'current bid']:
if not bid.startswith(''):
bid = f"{bid}"
return bid
return "€0"
def _extract_bid_count(self, content: str) -> int:
"""Extract number of bids"""
match = re.search(r'(\d+)\s*bids?', content, re.IGNORECASE)
if match:
try:
return int(match.group(1))
except:
pass
return 0
def _extract_end_date(self, content: str) -> str:
"""Extract auction end date"""
patterns = [
r'Ends?[:\s]+([A-Za-z0-9,:\s]+)',
r'endTime["\']:\s*["\']([^"\']+)["\']',
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
return match.group(1).strip()
return ""
def _extract_location(self, content: str) -> str:
"""Extract location"""
patterns = [
r'(?:Location|Locatie)[:\s]*</?\w*>\s*([A-Za-zÀ-ÿ0-9\s,.-]+?)(?:<|$)',
r'(?:Location|Locatie)[:\s]+([A-Za-zÀ-ÿ0-9\s,.-]+?)(?:<br|</|$)',
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
location = self.clean_text(match.group(1))
if location.lower() not in ['locatie', 'location', 'huidig bod', 'current bid']:
location = re.sub(r'[,.\s]+$', '', location)
if len(location) > 2:
return location
return ""
def _extract_description(self, content: str) -> str:
"""Extract description"""
pattern = r'<meta[^>]*name=["\']description["\'][^>]*content=["\']([^"\']+)["\']'
match = re.search(pattern, content, re.IGNORECASE | re.DOTALL)
return self.clean_text(match.group(1))[:500] if match else ""
def _extract_category(self, content: str) -> str:
"""Extract category from breadcrumb or meta tags"""
pattern = r'class="breadcrumb[^"]*".*?>([A-Za-z\s]+)</a>'
match = re.search(pattern, content, re.IGNORECASE)
if match:
return self.clean_text(match.group(1))
return self._extract_meta_content(content, 'category')
def _extract_images(self, content: str) -> List[str]:
"""Extract image URLs"""
pattern = r'<img[^>]*src=["\']([^"\']+\.jpe?g|[^"\']+\.png)["\'][^>]*>'
matches = re.findall(pattern, content, re.IGNORECASE)
images = []
for match in matches:
if any(skip in match.lower() for skip in ['logo', 'icon', 'placeholder', 'banner']):
continue
full_url = urljoin(BASE_URL, match)
images.append(full_url)
return images[:5] # Limit to 5 images