diff --git a/.idea/compiler.xml b/.idea/compiler.xml index e012797..6914cc8 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -8,11 +8,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1f134f4..f5fc504 100644 --- a/pom.xml +++ b/pom.xml @@ -215,6 +215,59 @@ io.quarkus quarkus-scheduler + + + io.netty + * + + + + + + + io.netty + netty-common + 4.1.124.Final + + + io.netty + netty-handler + 4.1.124.Final + + + io.netty + netty-buffer + 4.1.124.Final + + + io.netty + netty-transport + 4.1.124.Final + + + io.netty + netty-codec + 4.1.124.Final + + + io.netty + netty-codec-http + 4.1.124.Final + + + io.netty + netty-codec-http2 + 4.1.124.Final + + + io.netty + netty-resolver + 4.1.124.Final + + + io.netty + netty-resolver-dns + 4.1.124.Final io.quarkus @@ -279,6 +332,7 @@ ${maven.build.timestamp} + --enable-native-access=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true @@ -316,7 +370,7 @@ -Xdiags:verbose -Xlint:all - -proc:none + -parameters true diff --git a/src/main/java/auctiora/Main.java b/src/main/java/auctiora/Main.java index ad57608..a8bfdce 100644 --- a/src/main/java/auctiora/Main.java +++ b/src/main/java/auctiora/Main.java @@ -33,16 +33,16 @@ public class Main { log.info("=== Troostwijk Auction Monitor ===\n"); // Parse command line arguments - String mode = args.length > 0 ? args[0] : "workflow"; + var mode = args.length > 0 ? args[0] : "workflow"; // Configuration - Windows paths - String databaseFile = System.getenv().getOrDefault("DATABASE_FILE", "C:\\mnt\\okcomputer\\output\\cache.db"); - String notificationConfig = System.getenv().getOrDefault("NOTIFICATION_CONFIG", "desktop"); + var databaseFile = System.getenv().getOrDefault("DATABASE_FILE", "C:\\mnt\\okcomputer\\output\\cache.db"); + var notificationConfig = System.getenv().getOrDefault("NOTIFICATION_CONFIG", "desktop"); // YOLO model paths (optional - monitor works without object detection) - String yoloCfg = "models/yolov4.cfg"; - String yoloWeights = "models/yolov4.weights"; - String yoloClasses = "models/coco.names"; + var yoloCfg = "models/yolov4.cfg"; + var yoloWeights = "models/yolov4.weights"; + var yoloClasses = "models/coco.names"; // Load native OpenCV library (only if models exist) try { @@ -84,8 +84,8 @@ public class Main { throws Exception { log.info("šŸš€ Starting in WORKFLOW MODE (Orchestrated Scheduling)\n"); - - WorkflowOrchestrator orchestrator = new WorkflowOrchestrator( + + var orchestrator = new WorkflowOrchestrator( dbPath, notifConfig, yoloCfg, yoloWeights, yoloClasses ); @@ -126,8 +126,8 @@ public class Main { throws Exception { log.info("šŸ”„ Starting in ONCE MODE (Single Execution)\n"); - - WorkflowOrchestrator orchestrator = new WorkflowOrchestrator( + + var orchestrator = new WorkflowOrchestrator( dbPath, notifConfig, yoloCfg, yoloWeights, yoloClasses ); @@ -178,8 +178,8 @@ public class Main { throws Exception { log.info("šŸ“Š Checking Status...\n"); - - WorkflowOrchestrator orchestrator = new WorkflowOrchestrator( + + var orchestrator = new WorkflowOrchestrator( dbPath, notifConfig, yoloCfg, yoloWeights, yoloClasses ); diff --git a/src/main/java/auctiora/ObjectDetectionService.java b/src/main/java/auctiora/ObjectDetectionService.java index a9319f5..746973d 100644 --- a/src/main/java/auctiora/ObjectDetectionService.java +++ b/src/main/java/auctiora/ObjectDetectionService.java @@ -36,16 +36,16 @@ public class ObjectDetectionService { ObjectDetectionService(String cfgPath, String weightsPath, String classNamesPath) throws IOException { // Check if model files exist - var cfgFile = Paths.get(cfgPath); - var weightsFile = Paths.get(weightsPath); + var cfgFile = Paths.get(cfgPath); + var weightsFile = Paths.get(weightsPath); var classNamesFile = Paths.get(classNamesPath); if (!Files.exists(cfgFile) || !Files.exists(weightsFile) || !Files.exists(classNamesFile)) { log.info("āš ļø Object detection disabled: YOLO model files not found"); log.info(" Expected files:"); - log.info(" - " + cfgPath); - log.info(" - " + weightsPath); - log.info(" - " + classNamesPath); + log.info(" - {}", cfgPath); + log.info(" - {}", weightsPath); + log.info(" - {}", classNamesPath); log.info(" Scraper will continue without image analysis."); this.enabled = false; this.net = null; diff --git a/src/main/java/auctiora/QuarkusWorkflowScheduler.java b/src/main/java/auctiora/QuarkusWorkflowScheduler.java index 75b9eb5..f306ee0 100644 --- a/src/main/java/auctiora/QuarkusWorkflowScheduler.java +++ b/src/main/java/auctiora/QuarkusWorkflowScheduler.java @@ -17,272 +17,272 @@ import java.util.List; */ @ApplicationScoped public class QuarkusWorkflowScheduler { - - private static final Logger LOG = Logger.getLogger(QuarkusWorkflowScheduler.class); - - @Inject - DatabaseService db; - - @Inject - NotificationService notifier; - - @Inject - ObjectDetectionService detector; - - @Inject - ImageProcessingService imageProcessor; - - @ConfigProperty(name = "auction.database.path") - String databasePath; - - /** - * Workflow 1: Import Scraper Data - * Cron: Every 30 minutes (0 -/30 - - - ?) - * Purpose: Import new auctions and lots from external scraper - */ - @Scheduled(cron = "{auction.workflow.scraper-import.cron}", identity = "scraper-import") - void importScraperData() { - try { - LOG.info("šŸ“„ [WORKFLOW 1] Importing scraper data..."); - long start = System.currentTimeMillis(); - - // Import auctions - var auctions = db.importAuctionsFromScraper(); - LOG.infof(" → Imported %d auctions", auctions.size()); - - // Import lots - var lots = db.importLotsFromScraper(); - LOG.infof(" → Imported %d lots", lots.size()); - - // Import image URLs - var images = db.getUnprocessedImagesFromScraper(); - LOG.infof(" → Found %d unprocessed images", images.size()); - - long duration = System.currentTimeMillis() - start; - LOG.infof(" āœ“ Scraper import completed in %dms", duration); - - // Trigger notification if significant data imported - if (auctions.size() > 0 || lots.size() > 10) { - notifier.sendNotification( + + private static final Logger LOG = Logger.getLogger(QuarkusWorkflowScheduler.class); + + @Inject + DatabaseService db; + + @Inject + NotificationService notifier; + + @Inject + ObjectDetectionService detector; + + @Inject + ImageProcessingService imageProcessor; + + @ConfigProperty(name = "auction.database.path") + String databasePath; + + /** + * Workflow 1: Import Scraper Data + * Cron: Every 30 minutes (0 -/30 - - - ?) + * Purpose: Import new auctions and lots from external scraper + */ + @Scheduled(cron = "{auction.workflow.scraper-import.cron}", identity = "scraper-import") + void importScraperData() { + try { + LOG.info("šŸ“„ [WORKFLOW 1] Importing scraper data..."); + var start = System.currentTimeMillis(); + + // Import auctions + var auctions = db.importAuctionsFromScraper(); + LOG.infof(" → Imported %d auctions", auctions.size()); + + // Import lots + var lots = db.importLotsFromScraper(); + LOG.infof(" → Imported %d lots", lots.size()); + + // Import image URLs + var images = db.getUnprocessedImagesFromScraper(); + LOG.infof(" → Found %d unprocessed images", images.size()); + + var duration = System.currentTimeMillis() - start; + LOG.infof(" āœ“ Scraper import completed in %dms", duration); + + // Trigger notification if significant data imported + if (auctions.size() > 0 || lots.size() > 10) { + notifier.sendNotification( String.format("Imported %d auctions, %d lots", auctions.size(), lots.size()), "Data Import Complete", 0 - ); + ); + } + + } catch (Exception e) { + LOG.errorf(e, " āŒ Scraper import failed: %s", e.getMessage()); + } + } + + /** + * Workflow 2: Process Pending Images + * Cron: Every 1 hour (0 0 * * * ?) + * Purpose: Download images and run object detection + */ + @Scheduled(cron = "{auction.workflow.image-processing.cron}", identity = "image-processing") + void processImages() { + try { + LOG.info("šŸ–¼ļø [WORKFLOW 2] Processing pending images..."); + var start = System.currentTimeMillis(); + + // Get unprocessed images + var unprocessedImages = db.getUnprocessedImagesFromScraper(); + + if (unprocessedImages.isEmpty()) { + LOG.info(" → No pending images to process"); + return; + } + + LOG.infof(" → Processing %d images", unprocessedImages.size()); + + var processed = 0; + var detected = 0; + + for (var imageRecord : unprocessedImages) { + try { + // Download image + var filePath = imageProcessor.downloadImage( + imageRecord.url(), + imageRecord.saleId(), + imageRecord.lotId() + ); + + if (filePath != null) { + // Run object detection + var labels = detector.detectObjects(filePath); + + // Save to database + db.insertImage(imageRecord.lotId(), imageRecord.url(), + filePath, labels); + + processed++; + if (!labels.isEmpty()) { + detected++; + + // Send notification for interesting detections + if (labels.size() >= 3) { + notifier.sendNotification( + String.format("Lot %d: Detected %s", + imageRecord.lotId(), + String.join(", ", labels)), + "Objects Detected", + 0 + ); + } + } + } + + // Rate limiting + Thread.sleep(500); + + } catch (Exception e) { + LOG.warnf(" āš ļø Failed to process image: %s", e.getMessage()); } - - } catch (Exception e) { - LOG.errorf(e, " āŒ Scraper import failed: %s", e.getMessage()); - } - } - - /** - * Workflow 2: Process Pending Images - * Cron: Every 1 hour (0 0 * * * ?) - * Purpose: Download images and run object detection - */ - @Scheduled(cron = "{auction.workflow.image-processing.cron}", identity = "image-processing") - void processImages() { - try { - LOG.info("šŸ–¼ļø [WORKFLOW 2] Processing pending images..."); - long start = System.currentTimeMillis(); - - // Get unprocessed images - var unprocessedImages = db.getUnprocessedImagesFromScraper(); - - if (unprocessedImages.isEmpty()) { - LOG.info(" → No pending images to process"); - return; + } + + var duration = System.currentTimeMillis() - start; + LOG.infof(" āœ“ Processed %d images, detected objects in %d (%.1fs)", + processed, detected, duration / 1000.0); + + } catch (Exception e) { + LOG.errorf(e, " āŒ Image processing failed: %s", e.getMessage()); + } + } + + /** + * Workflow 3: Monitor Bids + * Cron: Every 15 minutes (0 -/15 * * * ?) + * Purpose: Check for bid changes and send notifications + */ + @Scheduled(cron = "{auction.workflow.bid-monitoring.cron}", identity = "bid-monitoring") + void monitorBids() { + try { + LOG.info("šŸ’° [WORKFLOW 3] Monitoring bids..."); + var start = System.currentTimeMillis(); + + var activeLots = db.getActiveLots(); + LOG.infof(" → Checking %d active lots", activeLots.size()); + + // Note: In production, this would call Troostwijk API + // For now, we just track what's in the database + // The external scraper updates bids, we just notify + + var duration = System.currentTimeMillis() - start; + LOG.infof(" āœ“ Bid monitoring completed in %dms", duration); + + } catch (Exception e) { + LOG.errorf(e, " āŒ Bid monitoring failed: %s", e.getMessage()); + } + } + + /** + * Workflow 4: Check Closing Times + * Cron: Every 5 minutes (0 -/5 * * * ?) + * Purpose: Send alerts for lots closing soon + */ + @Scheduled(cron = "{auction.workflow.closing-alerts.cron}", identity = "closing-alerts") + void checkClosingTimes() { + try { + LOG.info("ā° [WORKFLOW 4] Checking closing times..."); + var start = System.currentTimeMillis(); + + var activeLots = db.getActiveLots(); + var alertsSent = 0; + + for (var lot : activeLots) { + if (lot.closingTime() == null) continue; + + var minutesLeft = lot.minutesUntilClose(); + + // Alert for lots closing in 5 minutes + if (minutesLeft <= 5 && minutesLeft > 0 && !lot.closingNotified()) { + var message = String.format("Kavel %d sluit binnen %d min.", + lot.lotId(), minutesLeft); + + notifier.sendNotification(message, "Lot Closing Soon", 1); + + // Mark as notified + var updated = new Lot( + lot.saleId(), lot.lotId(), lot.title(), lot.description(), + lot.manufacturer(), lot.type(), lot.year(), lot.category(), + lot.currentBid(), lot.currency(), lot.url(), + lot.closingTime(), true + ); + db.updateLotNotificationFlags(updated); + + alertsSent++; } - - LOG.infof(" → Processing %d images", unprocessedImages.size()); - - int processed = 0; - int detected = 0; - - for (var imageRecord : unprocessedImages) { - try { - // Download image - String filePath = imageProcessor.downloadImage( - imageRecord.url(), - imageRecord.saleId(), - imageRecord.lotId() - ); - - if (filePath != null) { - // Run object detection - var labels = detector.detectObjects(filePath); - - // Save to database - db.insertImage(imageRecord.lotId(), imageRecord.url(), - filePath, labels); - - processed++; - if (!labels.isEmpty()) { - detected++; - - // Send notification for interesting detections - if (labels.size() >= 3) { - notifier.sendNotification( - String.format("Lot %d: Detected %s", - imageRecord.lotId(), - String.join(", ", labels)), - "Objects Detected", - 0 - ); - } - } - } - - // Rate limiting - Thread.sleep(500); - - } catch (Exception e) { - LOG.warnf(" āš ļø Failed to process image: %s", e.getMessage()); - } - } - - long duration = System.currentTimeMillis() - start; - LOG.infof(" āœ“ Processed %d images, detected objects in %d (%.1fs)", - processed, detected, duration / 1000.0); - - } catch (Exception e) { - LOG.errorf(e, " āŒ Image processing failed: %s", e.getMessage()); - } - } - - /** - * Workflow 3: Monitor Bids - * Cron: Every 15 minutes (0 -/15 * * * ?) - * Purpose: Check for bid changes and send notifications - */ - @Scheduled(cron = "{auction.workflow.bid-monitoring.cron}", identity = "bid-monitoring") - void monitorBids() { - try { - LOG.info("šŸ’° [WORKFLOW 3] Monitoring bids..."); - long start = System.currentTimeMillis(); - - var activeLots = db.getActiveLots(); - LOG.infof(" → Checking %d active lots", activeLots.size()); - - // Note: In production, this would call Troostwijk API - // For now, we just track what's in the database - // The external scraper updates bids, we just notify - - long duration = System.currentTimeMillis() - start; - LOG.infof(" āœ“ Bid monitoring completed in %dms", duration); - - } catch (Exception e) { - LOG.errorf(e, " āŒ Bid monitoring failed: %s", e.getMessage()); - } - } - - /** - * Workflow 4: Check Closing Times - * Cron: Every 5 minutes (0 -/5 * * * ?) - * Purpose: Send alerts for lots closing soon - */ - @Scheduled(cron = "{auction.workflow.closing-alerts.cron}", identity = "closing-alerts") - void checkClosingTimes() { - try { - LOG.info("ā° [WORKFLOW 4] Checking closing times..."); - long start = System.currentTimeMillis(); - - var activeLots = db.getActiveLots(); - int alertsSent = 0; - - for (var lot : activeLots) { - if (lot.closingTime() == null) continue; - - long minutesLeft = lot.minutesUntilClose(); - - // Alert for lots closing in 5 minutes - if (minutesLeft <= 5 && minutesLeft > 0 && !lot.closingNotified()) { - String message = String.format("Kavel %d sluit binnen %d min.", - lot.lotId(), minutesLeft); - - notifier.sendNotification(message, "Lot Closing Soon", 1); - - // Mark as notified - var updated = new Lot( - lot.saleId(), lot.lotId(), lot.title(), lot.description(), - lot.manufacturer(), lot.type(), lot.year(), lot.category(), - lot.currentBid(), lot.currency(), lot.url(), - lot.closingTime(), true - ); - db.updateLotNotificationFlags(updated); - - alertsSent++; - } - } - - long duration = System.currentTimeMillis() - start; - LOG.infof(" → Sent %d closing alerts in %dms", alertsSent, duration); - - } catch (Exception e) { - LOG.errorf(e, " āŒ Closing alerts failed: %s", e.getMessage()); - } - } - - /** - * Event-driven trigger: New auction discovered - */ - public void onNewAuctionDiscovered(AuctionInfo auction) { - LOG.infof("šŸ“£ EVENT: New auction discovered - %s", auction.title()); - - try { - db.upsertAuction(auction); - + } + + var duration = System.currentTimeMillis() - start; + LOG.infof(" → Sent %d closing alerts in %dms", alertsSent, duration); + + } catch (Exception e) { + LOG.errorf(e, " āŒ Closing alerts failed: %s", e.getMessage()); + } + } + + /** + * Event-driven trigger: New auction discovered + */ + public void onNewAuctionDiscovered(AuctionInfo auction) { + LOG.infof("šŸ“£ EVENT: New auction discovered - %s", auction.title()); + + try { + db.upsertAuction(auction); + + notifier.sendNotification( + String.format("New auction: %s\nLocation: %s\nLots: %d", + auction.title(), auction.location(), auction.lotCount()), + "New Auction Discovered", + 0 + ); + + } catch (Exception e) { + LOG.errorf(e, " āŒ Failed to handle new auction: %s", e.getMessage()); + } + } + + /** + * Event-driven trigger: Bid change detected + */ + public void onBidChange(Lot lot, double previousBid, double newBid) { + LOG.infof("šŸ“£ EVENT: Bid change on lot %d (€%.2f → €%.2f)", + lot.lotId(), previousBid, newBid); + + try { + db.updateLotCurrentBid(lot); + + notifier.sendNotification( + String.format("Nieuw bod op kavel %d: €%.2f (was €%.2f)", + lot.lotId(), newBid, previousBid), + "Kavel Bieding Update", + 0 + ); + + } catch (Exception e) { + LOG.errorf(e, " āŒ Failed to handle bid change: %s", e.getMessage()); + } + } + + /** + * Event-driven trigger: Objects detected in image + */ + public void onObjectsDetected(int lotId, List labels) { + LOG.infof("šŸ“£ EVENT: Objects detected in lot %d - %s", + lotId, String.join(", ", labels)); + + try { + if (labels.size() >= 2) { notifier.sendNotification( - String.format("New auction: %s\nLocation: %s\nLots: %d", - auction.title(), auction.location(), auction.lotCount()), - "New Auction Discovered", - 0 - ); - - } catch (Exception e) { - LOG.errorf(e, " āŒ Failed to handle new auction: %s", e.getMessage()); - } - } - - /** - * Event-driven trigger: Bid change detected - */ - public void onBidChange(Lot lot, double previousBid, double newBid) { - LOG.infof("šŸ“£ EVENT: Bid change on lot %d (€%.2f → €%.2f)", - lot.lotId(), previousBid, newBid); - - try { - db.updateLotCurrentBid(lot); - - notifier.sendNotification( - String.format("Nieuw bod op kavel %d: €%.2f (was €%.2f)", - lot.lotId(), newBid, previousBid), - "Kavel Bieding Update", - 0 - ); - - } catch (Exception e) { - LOG.errorf(e, " āŒ Failed to handle bid change: %s", e.getMessage()); - } - } - - /** - * Event-driven trigger: Objects detected in image - */ - public void onObjectsDetected(int lotId, List labels) { - LOG.infof("šŸ“£ EVENT: Objects detected in lot %d - %s", - lotId, String.join(", ", labels)); - - try { - if (labels.size() >= 2) { - notifier.sendNotification( String.format("Lot %d contains: %s", lotId, String.join(", ", labels)), "Objects Detected", 0 - ); - } - } catch (Exception e) { - LOG.errorf(e, " āŒ Failed to send detection notification: %s", e.getMessage()); - } - } + ); + } + } catch (Exception e) { + LOG.errorf(e, " āŒ Failed to send detection notification: %s", e.getMessage()); + } + } } diff --git a/src/main/java/auctiora/RateLimitedHttpClient.java b/src/main/java/auctiora/RateLimitedHttpClient.java index 8aebcee..3f4d914 100644 --- a/src/main/java/auctiora/RateLimitedHttpClient.java +++ b/src/main/java/auctiora/RateLimitedHttpClient.java @@ -18,253 +18,255 @@ import java.util.concurrent.atomic.AtomicLong; /** * Rate-limited HTTP client that enforces per-host request limits. - * + * * Features: * - Per-host rate limiting (configurable max requests per second) * - Request counting and monitoring * - Thread-safe using semaphores * - Automatic host extraction from URLs - * + * * This prevents overloading external services like Troostwijk and getting blocked. */ @ApplicationScoped public class RateLimitedHttpClient { - - private static final Logger LOG = Logger.getLogger(RateLimitedHttpClient.class); - - private final HttpClient httpClient; - private final Map rateLimiters; - private final Map requestStats; - - @ConfigProperty(name = "auction.http.rate-limit.default-max-rps", defaultValue = "2") - int defaultMaxRequestsPerSecond; - - @ConfigProperty(name = "auction.http.rate-limit.troostwijk-max-rps", defaultValue = "1") - int troostwijkMaxRequestsPerSecond; - - @ConfigProperty(name = "auction.http.timeout-seconds", defaultValue = "30") - int timeoutSeconds; - - public RateLimitedHttpClient() { - this.httpClient = HttpClient.newBuilder() - .connectTimeout(Duration.ofSeconds(30)) - .build(); - this.rateLimiters = new ConcurrentHashMap<>(); - this.requestStats = new ConcurrentHashMap<>(); - } - - /** - * Sends a GET request with automatic rate limiting based on host. - */ - public HttpResponse sendGet(String url) throws IOException, InterruptedException { - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(url)) - .timeout(Duration.ofSeconds(timeoutSeconds)) - .GET() - .build(); - - return send(request, HttpResponse.BodyHandlers.ofString()); - } - - /** - * Sends a request for binary data (like images) with rate limiting. - */ - public HttpResponse sendGetBytes(String url) throws IOException, InterruptedException { - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(url)) - .timeout(Duration.ofSeconds(timeoutSeconds)) - .GET() - .build(); - - return send(request, HttpResponse.BodyHandlers.ofByteArray()); - } - - /** - * Sends any HTTP request with automatic rate limiting. - */ - public HttpResponse send(HttpRequest request, HttpResponse.BodyHandler bodyHandler) - throws IOException, InterruptedException { - - String host = extractHost(request.uri()); - RateLimiter limiter = getRateLimiter(host); - RequestStats stats = getRequestStats(host); - - // Enforce rate limit (blocks if necessary) - limiter.acquire(); - - // Track request - stats.incrementTotal(); - long startTime = System.currentTimeMillis(); - - try { - HttpResponse response = httpClient.send(request, bodyHandler); - - long duration = System.currentTimeMillis() - startTime; - stats.recordSuccess(duration); - - LOG.debugf("HTTP %d %s %s (%dms)", - response.statusCode(), request.method(), host, duration); - - // Track rate limit violations (429 = Too Many Requests) - if (response.statusCode() == 429) { - stats.incrementRateLimited(); - LOG.warnf("āš ļø Rate limited by %s (HTTP 429)", host); + + private static final Logger LOG = Logger.getLogger(RateLimitedHttpClient.class); + + private final HttpClient httpClient; + private final Map rateLimiters; + private final Map requestStats; + + @ConfigProperty(name = "auction.http.rate-limit.default-max-rps", defaultValue = "2") + int defaultMaxRequestsPerSecond; + + @ConfigProperty(name = "auction.http.rate-limit.troostwijk-max-rps", defaultValue = "1") + int troostwijkMaxRequestsPerSecond; + + @ConfigProperty(name = "auction.http.timeout-seconds", defaultValue = "30") + int timeoutSeconds; + + public RateLimitedHttpClient() { + this.httpClient = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(30)) + .build(); + this.rateLimiters = new ConcurrentHashMap<>(); + this.requestStats = new ConcurrentHashMap<>(); + } + + /** + * Sends a GET request with automatic rate limiting based on host. + */ + public HttpResponse sendGet(String url) throws IOException, InterruptedException { + var request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .timeout(Duration.ofSeconds(timeoutSeconds)) + .GET() + .build(); + + return send(request, HttpResponse.BodyHandlers.ofString()); + } + + /** + * Sends a request for binary data (like images) with rate limiting. + */ + public HttpResponse sendGetBytes(String url) throws IOException, InterruptedException { + var request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .timeout(Duration.ofSeconds(timeoutSeconds)) + .GET() + .build(); + + return send(request, HttpResponse.BodyHandlers.ofByteArray()); + } + + /** + * Sends any HTTP request with automatic rate limiting. + */ + public HttpResponse send(HttpRequest request, HttpResponse.BodyHandler bodyHandler) + throws IOException, InterruptedException { + + var host = extractHost(request.uri()); + var limiter = getRateLimiter(host); + var stats = getRequestStats(host); + + // Enforce rate limit (blocks if necessary) + limiter.acquire(); + + // Track request + stats.incrementTotal(); + var startTime = System.currentTimeMillis(); + + try { + var response = httpClient.send(request, bodyHandler); + + var duration = System.currentTimeMillis() - startTime; + stats.recordSuccess(duration); + + LOG.debugf("HTTP %d %s %s (%dms)", + response.statusCode(), request.method(), host, duration); + + // Track rate limit violations (429 = Too Many Requests) + if (response.statusCode() == 429) { + stats.incrementRateLimited(); + LOG.warnf("āš ļø Rate limited by %s (HTTP 429)", host); + } + + return response; + + } catch (IOException | InterruptedException e) { + stats.incrementFailed(); + LOG.warnf("āŒ HTTP request failed for %s: %s", host, e.getMessage()); + throw e; + } + } + + /** + * Gets or creates a rate limiter for a specific host. + */ + private RateLimiter getRateLimiter(String host) { + return rateLimiters.computeIfAbsent(host, h -> { + var maxRps = getMaxRequestsPerSecond(h); + LOG.infof("Initializing rate limiter for %s: %d req/s", h, maxRps); + return new RateLimiter(maxRps); + }); + } + + /** + * Gets or creates request stats for a specific host. + */ + private RequestStats getRequestStats(String host) { + return requestStats.computeIfAbsent(host, h -> new RequestStats(h)); + } + + /** + * Determines max requests per second for a given host. + */ + private int getMaxRequestsPerSecond(String host) { + if (host.contains("troostwijk")) { + return troostwijkMaxRequestsPerSecond; + } + return defaultMaxRequestsPerSecond; + } + + /** + * Extracts host from URI (e.g., "api.troostwijkauctions.com"). + */ + private String extractHost(URI uri) { + return uri.getHost() != null ? uri.getHost() : uri.toString(); + } + + /** + * Gets statistics for all hosts. + */ + public Map getAllStats() { + return Map.copyOf(requestStats); + } + + /** + * Gets statistics for a specific host. + */ + public RequestStats getStats(String host) { + return requestStats.get(host); + } + + /** + * Rate limiter implementation using token bucket algorithm. + * Allows burst traffic up to maxRequestsPerSecond, then enforces steady rate. + */ + private static class RateLimiter { + + private final Semaphore semaphore; + private final int maxRequestsPerSecond; + private final long intervalNanos; + + RateLimiter(int maxRequestsPerSecond) { + this.maxRequestsPerSecond = maxRequestsPerSecond; + this.intervalNanos = TimeUnit.SECONDS.toNanos(1) / maxRequestsPerSecond; + this.semaphore = new Semaphore(maxRequestsPerSecond); + + // Refill tokens periodically + startRefillThread(); + } + + void acquire() throws InterruptedException { + semaphore.acquire(); + + // Enforce minimum delay between requests + var delayMillis = intervalNanos / 1_000_000; + if (delayMillis > 0) { + Thread.sleep(delayMillis); + } + } + + private void startRefillThread() { + var refillThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(1000); // Refill every second + var toRelease = maxRequestsPerSecond - semaphore.availablePermits(); + if (toRelease > 0) { + semaphore.release(toRelease); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } } - - return response; - - } catch (IOException | InterruptedException e) { - stats.incrementFailed(); - LOG.warnf("āŒ HTTP request failed for %s: %s", host, e.getMessage()); - throw e; - } - } - - /** - * Gets or creates a rate limiter for a specific host. - */ - private RateLimiter getRateLimiter(String host) { - return rateLimiters.computeIfAbsent(host, h -> { - int maxRps = getMaxRequestsPerSecond(h); - LOG.infof("Initializing rate limiter for %s: %d req/s", h, maxRps); - return new RateLimiter(maxRps); - }); - } - - /** - * Gets or creates request stats for a specific host. - */ - private RequestStats getRequestStats(String host) { - return requestStats.computeIfAbsent(host, h -> new RequestStats(h)); - } - - /** - * Determines max requests per second for a given host. - */ - private int getMaxRequestsPerSecond(String host) { - if (host.contains("troostwijk")) { - return troostwijkMaxRequestsPerSecond; - } - return defaultMaxRequestsPerSecond; - } - - /** - * Extracts host from URI (e.g., "api.troostwijkauctions.com"). - */ - private String extractHost(URI uri) { - return uri.getHost() != null ? uri.getHost() : uri.toString(); - } - - /** - * Gets statistics for all hosts. - */ - public Map getAllStats() { - return Map.copyOf(requestStats); - } - - /** - * Gets statistics for a specific host. - */ - public RequestStats getStats(String host) { - return requestStats.get(host); - } - - /** - * Rate limiter implementation using token bucket algorithm. - * Allows burst traffic up to maxRequestsPerSecond, then enforces steady rate. - */ - private static class RateLimiter { - private final Semaphore semaphore; - private final int maxRequestsPerSecond; - private final long intervalNanos; - - RateLimiter(int maxRequestsPerSecond) { - this.maxRequestsPerSecond = maxRequestsPerSecond; - this.intervalNanos = TimeUnit.SECONDS.toNanos(1) / maxRequestsPerSecond; - this.semaphore = new Semaphore(maxRequestsPerSecond); - - // Refill tokens periodically - startRefillThread(); - } - - void acquire() throws InterruptedException { - semaphore.acquire(); - - // Enforce minimum delay between requests - long delayMillis = intervalNanos / 1_000_000; - if (delayMillis > 0) { - Thread.sleep(delayMillis); - } - } - - private void startRefillThread() { - Thread refillThread = new Thread(() -> { - while (!Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(1000); // Refill every second - int toRelease = maxRequestsPerSecond - semaphore.availablePermits(); - if (toRelease > 0) { - semaphore.release(toRelease); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } - }, "RateLimiter-Refill"); - refillThread.setDaemon(true); - refillThread.start(); - } - } - - /** - * Statistics tracker for HTTP requests per host. - */ - public static class RequestStats { - private final String host; - private final AtomicLong totalRequests = new AtomicLong(0); - private final AtomicLong successfulRequests = new AtomicLong(0); - private final AtomicLong failedRequests = new AtomicLong(0); - private final AtomicLong rateLimitedRequests = new AtomicLong(0); - private final AtomicLong totalDurationMs = new AtomicLong(0); - - RequestStats(String host) { - this.host = host; - } - - void incrementTotal() { - totalRequests.incrementAndGet(); - } - - void recordSuccess(long durationMs) { - successfulRequests.incrementAndGet(); - totalDurationMs.addAndGet(durationMs); - } - - void incrementFailed() { - failedRequests.incrementAndGet(); - } - - void incrementRateLimited() { - rateLimitedRequests.incrementAndGet(); - } - - // Getters - public String getHost() { return host; } - public long getTotalRequests() { return totalRequests.get(); } - public long getSuccessfulRequests() { return successfulRequests.get(); } - public long getFailedRequests() { return failedRequests.get(); } - public long getRateLimitedRequests() { return rateLimitedRequests.get(); } - public long getAverageDurationMs() { - long successful = successfulRequests.get(); - return successful > 0 ? totalDurationMs.get() / successful : 0; - } - - @Override - public String toString() { - return String.format("%s: %d total, %d success, %d failed, %d rate-limited, avg %dms", - host, getTotalRequests(), getSuccessfulRequests(), - getFailedRequests(), getRateLimitedRequests(), getAverageDurationMs()); - } - } + }, "RateLimiter-Refill"); + refillThread.setDaemon(true); + refillThread.start(); + } + } + + /** + * Statistics tracker for HTTP requests per host. + */ + public static class RequestStats { + + private final String host; + private final AtomicLong totalRequests = new AtomicLong(0); + private final AtomicLong successfulRequests = new AtomicLong(0); + private final AtomicLong failedRequests = new AtomicLong(0); + private final AtomicLong rateLimitedRequests = new AtomicLong(0); + private final AtomicLong totalDurationMs = new AtomicLong(0); + + RequestStats(String host) { + this.host = host; + } + + void incrementTotal() { + totalRequests.incrementAndGet(); + } + + void recordSuccess(long durationMs) { + successfulRequests.incrementAndGet(); + totalDurationMs.addAndGet(durationMs); + } + + void incrementFailed() { + failedRequests.incrementAndGet(); + } + + void incrementRateLimited() { + rateLimitedRequests.incrementAndGet(); + } + + // Getters + public String getHost() { return host; } + public long getTotalRequests() { return totalRequests.get(); } + public long getSuccessfulRequests() { return successfulRequests.get(); } + public long getFailedRequests() { return failedRequests.get(); } + public long getRateLimitedRequests() { return rateLimitedRequests.get(); } + public long getAverageDurationMs() { + var successful = successfulRequests.get(); + return successful > 0 ? totalDurationMs.get() / successful : 0; + } + + @Override + public String toString() { + return String.format("%s: %d total, %d success, %d failed, %d rate-limited, avg %dms", + host, getTotalRequests(), getSuccessfulRequests(), + getFailedRequests(), getRateLimitedRequests(), getAverageDurationMs()); + } + } } diff --git a/src/main/java/auctiora/ScraperDataAdapter.java b/src/main/java/auctiora/ScraperDataAdapter.java index 19ee4d3..39cd094 100644 --- a/src/main/java/auctiora/ScraperDataAdapter.java +++ b/src/main/java/auctiora/ScraperDataAdapter.java @@ -19,19 +19,19 @@ public class ScraperDataAdapter { static AuctionInfo fromScraperAuction(ResultSet rs) throws SQLException { // Parse "A7-39813" → auctionId=39813, type="A7" - String auctionIdStr = rs.getString("auction_id"); - int auctionId = extractNumericId(auctionIdStr); - String type = extractTypePrefix(auctionIdStr); + var auctionIdStr = rs.getString("auction_id"); + var auctionId = extractNumericId(auctionIdStr); + var type = extractTypePrefix(auctionIdStr); // Split "Cluj-Napoca, RO" → city="Cluj-Napoca", country="RO" - String location = rs.getString("location"); - String[] locationParts = parseLocation(location); - String city = locationParts[0]; - String country = locationParts[1]; + var location = rs.getString("location"); + var locationParts = parseLocation(location); + var city = locationParts[0]; + var country = locationParts[1]; // Map field names - int lotCount = getIntOrDefault(rs, "lots_count", 0); - LocalDateTime closingTime = parseTimestamp(getStringOrNull(rs, "first_lot_closing_time")); + var lotCount = getIntOrDefault(rs, "lots_count", 0); + var closingTime = parseTimestamp(getStringOrNull(rs, "first_lot_closing_time")); return new AuctionInfo( auctionId, diff --git a/src/main/java/auctiora/WorkflowOrchestrator.java b/src/main/java/auctiora/WorkflowOrchestrator.java index f72735e..ebd266f 100644 --- a/src/main/java/auctiora/WorkflowOrchestrator.java +++ b/src/main/java/auctiora/WorkflowOrchestrator.java @@ -94,18 +94,18 @@ public class WorkflowOrchestrator { // Import auctions var auctions = db.importAuctionsFromScraper(); - log.info(" → Imported " + auctions.size() + " auctions"); + log.info(" → Imported {} auctions", auctions.size()); // Import lots var lots = db.importLotsFromScraper(); - log.info(" → Imported " + lots.size() + " lots"); + log.info(" → Imported {} lots", lots.size()); // Import image URLs var images = db.getUnprocessedImagesFromScraper(); - log.info(" → Found " + images.size() + " unprocessed images"); + log.info(" → Found {} unprocessed images", images.size()); long duration = System.currentTimeMillis() - start; - log.info(" āœ“ Scraper import completed in " + duration + "ms\n"); + log.info(" āœ“ Scraper import completed in {}ms\n", duration); // Trigger notification if significant data imported if (auctions.size() > 0 || lots.size() > 10) { @@ -117,7 +117,7 @@ public class WorkflowOrchestrator { } } catch (Exception e) { - log.info(" āŒ Scraper import failed: " + e.getMessage()); + log.info(" āŒ Scraper import failed: {}", e.getMessage()); } }, 0, 30, TimeUnit.MINUTES); @@ -143,7 +143,7 @@ public class WorkflowOrchestrator { return; } - log.info(" → Processing " + unprocessedImages.size() + " images"); + log.info(" → Processing {} images", unprocessedImages.size()); int processed = 0; int detected = 0; @@ -186,7 +186,7 @@ public class WorkflowOrchestrator { Thread.sleep(500); } catch (Exception e) { - log.info(" āš ļø Failed to process image: " + e.getMessage()); + log.info(" ⚠\uFE0F Failed to process image: {}", e.getMessage()); } } @@ -195,7 +195,7 @@ public class WorkflowOrchestrator { processed, detected, duration / 1000.0)); } catch (Exception e) { - log.info(" āŒ Image processing failed: " + e.getMessage()); + log.info(" āŒ Image processing failed: {}", e.getMessage()); } }, 5, 60, TimeUnit.MINUTES); @@ -214,7 +214,7 @@ public class WorkflowOrchestrator { long start = System.currentTimeMillis(); var activeLots = db.getActiveLots(); - log.info(" → Checking " + activeLots.size() + " active lots"); + log.info(" → Checking {} active lots", activeLots.size()); int bidChanges = 0; @@ -228,7 +228,7 @@ public class WorkflowOrchestrator { log.info(String.format(" āœ“ Bid monitoring completed in %dms\n", duration)); } catch (Exception e) { - log.info(" āŒ Bid monitoring failed: " + e.getMessage()); + log.info(" āŒ Bid monitoring failed: {}", e.getMessage()); } }, 2, 15, TimeUnit.MINUTES); @@ -279,7 +279,7 @@ public class WorkflowOrchestrator { alertsSent, duration)); } catch (Exception e) { - log.info(" āŒ Closing alerts failed: " + e.getMessage()); + log.info(" āŒ Closing alerts failed: {}", e.getMessage()); } }, 1, 5, TimeUnit.MINUTES); @@ -298,7 +298,7 @@ public class WorkflowOrchestrator { log.info("[1/4] Importing scraper data..."); var auctions = db.importAuctionsFromScraper(); var lots = db.importLotsFromScraper(); - log.info(" āœ“ Imported " + auctions.size() + " auctions, " + lots.size() + " lots"); + log.info(" āœ“ Imported {} auctions, {} lots", auctions.size(), lots.size()); // Step 2: Process images log.info("[2/4] Processing pending images..."); @@ -308,7 +308,7 @@ public class WorkflowOrchestrator { // Step 3: Check bids log.info("[3/4] Monitoring bids..."); var activeLots = db.getActiveLots(); - log.info(" āœ“ Monitored " + activeLots.size() + " lots"); + log.info(" āœ“ Monitored {} lots", activeLots.size()); // Step 4: Check closing times log.info("[4/4] Checking closing times..."); @@ -318,12 +318,12 @@ public class WorkflowOrchestrator { closingSoon++; } } - log.info(" āœ“ Found " + closingSoon + " lots closing soon"); + log.info(" āœ“ Found {} lots closing soon", closingSoon); log.info("\nāœ“ Complete workflow finished successfully\n"); } catch (Exception e) { - log.info("\nāŒ Workflow failed: " + e.getMessage() + "\n"); + log.info("\nāŒ Workflow failed: {}\n", e.getMessage()); } } @@ -331,7 +331,7 @@ public class WorkflowOrchestrator { * Event-driven trigger: New auction discovered */ public void onNewAuctionDiscovered(AuctionInfo auction) { - log.info("šŸ“£ EVENT: New auction discovered - " + auction.title()); + log.info("\uD83D\uDCE3 EVENT: New auction discovered - {}", auction.title()); try { db.upsertAuction(auction); @@ -344,7 +344,7 @@ public class WorkflowOrchestrator { ); } catch (Exception e) { - log.info(" āŒ Failed to handle new auction: " + e.getMessage()); + log.info(" āŒ Failed to handle new auction: {}", e.getMessage()); } } @@ -366,7 +366,7 @@ public class WorkflowOrchestrator { ); } catch (Exception e) { - log.info(" āŒ Failed to handle bid change: " + e.getMessage()); + log.info(" āŒ Failed to handle bid change: {}", e.getMessage()); } } @@ -386,7 +386,7 @@ public class WorkflowOrchestrator { ); } } catch (Exception e) { - log.info(" āŒ Failed to send detection notification: " + e.getMessage()); + log.info(" āŒ Failed to send detection notification: {}", e.getMessage()); } } @@ -395,16 +395,16 @@ public class WorkflowOrchestrator { */ public void printStatus() { log.info("\nšŸ“Š Workflow Status:"); - log.info(" Running: " + (isRunning ? "Yes" : "No")); + log.info(" Running: {}", isRunning ? "Yes" : "No"); try { var auctions = db.getAllAuctions(); var lots = db.getAllLots(); int images = db.getImageCount(); - log.info(" Auctions: " + auctions.size()); - log.info(" Lots: " + lots.size()); - log.info(" Images: " + images); + log.info(" Auctions: {}", auctions.size()); + log.info(" Lots: {}", lots.size()); + log.info(" Images: {}", images); // Count closing soon int closingSoon = 0; @@ -413,10 +413,10 @@ public class WorkflowOrchestrator { closingSoon++; } } - log.info(" Closing soon (< 30 min): " + closingSoon); + log.info(" Closing soon (< 30 min): {}", closingSoon); } catch (Exception e) { - log.info(" āš ļø Could not retrieve status: " + e.getMessage()); + log.info(" ⚠\uFE0F Could not retrieve status: {}", e.getMessage()); } IO.println(); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 4a5d553..19ca05c 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -33,7 +33,6 @@ quarkus.log.console.level=INFO # JVM Arguments for native access (Jansi, OpenCV, etc.) quarkus.native.additional-build-args=--enable-native-access=ALL-UNNAMED -quarkus.jvm.args=--enable-native-access=ALL-UNNAMED # Production optimizations %prod.quarkus.package.type=fast-jar