Files
auctiora/src/main/java/com/auction/WorkflowOrchestrator.java
2025-12-03 17:17:49 +01:00

443 lines
16 KiB
Java

package com.auction;
import java.io.IOException;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Orchestrates the complete workflow of auction monitoring, image processing,
* object detection, and notifications.
*
* This class coordinates all services and provides scheduled execution,
* event-driven triggers, and manual workflow execution.
*/
public class WorkflowOrchestrator {
private final TroostwijkMonitor monitor;
private final DatabaseService db;
private final ImageProcessingService imageProcessor;
private final NotificationService notifier;
private final ObjectDetectionService detector;
private final ScheduledExecutorService scheduler;
private boolean isRunning = false;
/**
* Creates a workflow orchestrator with all necessary services.
*/
public WorkflowOrchestrator(String databasePath, String notificationConfig,
String yoloCfg, String yoloWeights, String yoloClasses)
throws SQLException, IOException {
Console.println("🔧 Initializing Workflow Orchestrator...");
// Initialize core services
this.db = new DatabaseService(databasePath);
this.db.ensureSchema();
this.notifier = new NotificationService(notificationConfig, "");
this.detector = new ObjectDetectionService(yoloCfg, yoloWeights, yoloClasses);
this.imageProcessor = new ImageProcessingService(db, detector);
this.monitor = new TroostwijkMonitor(databasePath, notificationConfig,
yoloCfg, yoloWeights, yoloClasses);
this.scheduler = Executors.newScheduledThreadPool(3);
Console.println("✓ Workflow Orchestrator initialized");
}
/**
* Starts all scheduled workflows.
* This is the main entry point for automated operation.
*/
public void startScheduledWorkflows() {
if (isRunning) {
Console.println("⚠️ Workflows already running");
return;
}
Console.println("\n🚀 Starting Scheduled Workflows...\n");
// Workflow 1: Import scraper data (every 30 minutes)
scheduleScraperDataImport();
// Workflow 2: Process pending images (every 1 hour)
scheduleImageProcessing();
// Workflow 3: Monitor bids (every 15 minutes)
scheduleBidMonitoring();
// Workflow 4: Check closing times (every 5 minutes)
scheduleClosingAlerts();
isRunning = true;
Console.println("✓ All scheduled workflows started\n");
}
/**
* Workflow 1: Import Scraper Data
* Frequency: Every 30 minutes
* Purpose: Import new auctions and lots from external scraper
*/
private void scheduleScraperDataImport() {
scheduler.scheduleAtFixedRate(() -> {
try {
Console.println("📥 [WORKFLOW 1] Importing scraper data...");
long start = System.currentTimeMillis();
// Import auctions
var auctions = db.importAuctionsFromScraper();
Console.println(" → Imported " + auctions.size() + " auctions");
// Import lots
var lots = db.importLotsFromScraper();
Console.println(" → Imported " + lots.size() + " lots");
// Import image URLs
var images = db.getUnprocessedImagesFromScraper();
Console.println(" → Found " + images.size() + " unprocessed images");
long duration = System.currentTimeMillis() - start;
Console.println(" ✓ Scraper import completed in " + duration + "ms\n");
// Trigger notification if significant data imported
if (auctions.size() > 0 || lots.size() > 10) {
notifier.sendNotification(
String.format("Imported %d auctions, %d lots", auctions.size(), lots.size()),
"Data Import Complete",
0
);
}
} catch (Exception e) {
Console.println(" ❌ Scraper import failed: " + e.getMessage());
}
}, 0, 30, TimeUnit.MINUTES);
Console.println(" ✓ Scheduled: Scraper Data Import (every 30 min)");
}
/**
* Workflow 2: Process Pending Images
* Frequency: Every 1 hour
* Purpose: Download images and run object detection
*/
private void scheduleImageProcessing() {
scheduler.scheduleAtFixedRate(() -> {
try {
Console.println("🖼️ [WORKFLOW 2] Processing pending images...");
long start = System.currentTimeMillis();
// Get unprocessed images
var unprocessedImages = db.getUnprocessedImagesFromScraper();
if (unprocessedImages.isEmpty()) {
Console.println(" → No pending images to process\n");
return;
}
Console.println(" → Processing " + unprocessedImages.size() + " images");
int processed = 0;
int detected = 0;
for (var imageRecord : unprocessedImages) {
try {
// Download image
String filePath = imageProcessor.downloadImage(
imageRecord.url(),
imageRecord.saleId(),
imageRecord.lotId()
);
if (filePath != null) {
// Run object detection
var labels = detector.detectObjects(filePath);
// Save to database
db.insertImage(imageRecord.lotId(), imageRecord.url(),
filePath, labels);
processed++;
if (!labels.isEmpty()) {
detected++;
// Send notification for interesting detections
if (labels.size() >= 3) {
notifier.sendNotification(
String.format("Lot %d: Detected %s",
imageRecord.lotId(),
String.join(", ", labels)),
"Objects Detected",
0
);
}
}
}
// Rate limiting
Thread.sleep(500);
} catch (Exception e) {
Console.println(" ⚠️ Failed to process image: " + e.getMessage());
}
}
long duration = System.currentTimeMillis() - start;
Console.println(String.format(" ✓ Processed %d images, detected objects in %d (%.1fs)\n",
processed, detected, duration / 1000.0));
} catch (Exception e) {
Console.println(" ❌ Image processing failed: " + e.getMessage());
}
}, 5, 60, TimeUnit.MINUTES);
Console.println(" ✓ Scheduled: Image Processing (every 1 hour)");
}
/**
* Workflow 3: Monitor Bids
* Frequency: Every 15 minutes
* Purpose: Check for bid changes and send notifications
*/
private void scheduleBidMonitoring() {
scheduler.scheduleAtFixedRate(() -> {
try {
Console.println("💰 [WORKFLOW 3] Monitoring bids...");
long start = System.currentTimeMillis();
var activeLots = db.getActiveLots();
Console.println(" → Checking " + activeLots.size() + " active lots");
int bidChanges = 0;
for (var lot : activeLots) {
// Note: In production, this would call Troostwijk API
// For now, we just track what's in the database
// The external scraper updates bids, we just notify
}
long duration = System.currentTimeMillis() - start;
Console.println(String.format(" ✓ Bid monitoring completed in %dms\n", duration));
} catch (Exception e) {
Console.println(" ❌ Bid monitoring failed: " + e.getMessage());
}
}, 2, 15, TimeUnit.MINUTES);
Console.println(" ✓ Scheduled: Bid Monitoring (every 15 min)");
}
/**
* Workflow 4: Check Closing Times
* Frequency: Every 5 minutes
* Purpose: Send alerts for lots closing soon
*/
private void scheduleClosingAlerts() {
scheduler.scheduleAtFixedRate(() -> {
try {
Console.println("⏰ [WORKFLOW 4] Checking closing times...");
long start = System.currentTimeMillis();
var activeLots = db.getActiveLots();
int alertsSent = 0;
for (var lot : activeLots) {
if (lot.closingTime() == null) continue;
long minutesLeft = lot.minutesUntilClose();
// Alert for lots closing in 5 minutes
if (minutesLeft <= 5 && minutesLeft > 0 && !lot.closingNotified()) {
String message = String.format("Kavel %d sluit binnen %d min.",
lot.lotId(), minutesLeft);
notifier.sendNotification(message, "Lot Closing Soon", 1);
// Mark as notified
var updated = new Lot(
lot.saleId(), lot.lotId(), lot.title(), lot.description(),
lot.manufacturer(), lot.type(), lot.year(), lot.category(),
lot.currentBid(), lot.currency(), lot.url(),
lot.closingTime(), true
);
db.updateLotNotificationFlags(updated);
alertsSent++;
}
}
long duration = System.currentTimeMillis() - start;
Console.println(String.format(" → Sent %d closing alerts in %dms\n",
alertsSent, duration));
} catch (Exception e) {
Console.println(" ❌ Closing alerts failed: " + e.getMessage());
}
}, 1, 5, TimeUnit.MINUTES);
Console.println(" ✓ Scheduled: Closing Alerts (every 5 min)");
}
/**
* Manual trigger: Run complete workflow once
* Useful for testing or on-demand execution
*/
public void runCompleteWorkflowOnce() {
Console.println("\n🔄 Running Complete Workflow (Manual Trigger)...\n");
try {
// Step 1: Import data
Console.println("[1/4] Importing scraper data...");
var auctions = db.importAuctionsFromScraper();
var lots = db.importLotsFromScraper();
Console.println(" ✓ Imported " + auctions.size() + " auctions, " + lots.size() + " lots");
// Step 2: Process images
Console.println("[2/4] Processing pending images...");
monitor.processPendingImages();
Console.println(" ✓ Image processing completed");
// Step 3: Check bids
Console.println("[3/4] Monitoring bids...");
var activeLots = db.getActiveLots();
Console.println(" ✓ Monitored " + activeLots.size() + " lots");
// Step 4: Check closing times
Console.println("[4/4] Checking closing times...");
int closingSoon = 0;
for (var lot : activeLots) {
if (lot.closingTime() != null && lot.minutesUntilClose() < 30) {
closingSoon++;
}
}
Console.println(" ✓ Found " + closingSoon + " lots closing soon");
Console.println("\n✓ Complete workflow finished successfully\n");
} catch (Exception e) {
Console.println("\n❌ Workflow failed: " + e.getMessage() + "\n");
}
}
/**
* Event-driven trigger: New auction discovered
*/
public void onNewAuctionDiscovered(AuctionInfo auction) {
Console.println("📣 EVENT: New auction discovered - " + auction.title());
try {
db.upsertAuction(auction);
notifier.sendNotification(
String.format("New auction: %s\nLocation: %s\nLots: %d",
auction.title(), auction.location(), auction.lotCount()),
"New Auction Discovered",
0
);
} catch (Exception e) {
Console.println(" ❌ Failed to handle new auction: " + e.getMessage());
}
}
/**
* Event-driven trigger: Bid change detected
*/
public void onBidChange(Lot lot, double previousBid, double newBid) {
Console.println(String.format("📣 EVENT: Bid change on lot %d (€%.2f → €%.2f)",
lot.lotId(), previousBid, newBid));
try {
db.updateLotCurrentBid(lot);
notifier.sendNotification(
String.format("Nieuw bod op kavel %d: €%.2f (was €%.2f)",
lot.lotId(), newBid, previousBid),
"Kavel Bieding Update",
0
);
} catch (Exception e) {
Console.println(" ❌ Failed to handle bid change: " + e.getMessage());
}
}
/**
* Event-driven trigger: Objects detected in image
*/
public void onObjectsDetected(int lotId, List<String> labels) {
Console.println(String.format("📣 EVENT: Objects detected in lot %d - %s",
lotId, String.join(", ", labels)));
try {
if (labels.size() >= 2) {
notifier.sendNotification(
String.format("Lot %d contains: %s", lotId, String.join(", ", labels)),
"Objects Detected",
0
);
}
} catch (Exception e) {
Console.println(" ❌ Failed to send detection notification: " + e.getMessage());
}
}
/**
* Prints current workflow status
*/
public void printStatus() {
Console.println("\n📊 Workflow Status:");
Console.println(" Running: " + (isRunning ? "Yes" : "No"));
try {
var auctions = db.getAllAuctions();
var lots = db.getAllLots();
int images = db.getImageCount();
Console.println(" Auctions: " + auctions.size());
Console.println(" Lots: " + lots.size());
Console.println(" Images: " + images);
// Count closing soon
int closingSoon = 0;
for (var lot : lots) {
if (lot.closingTime() != null && lot.minutesUntilClose() < 30) {
closingSoon++;
}
}
Console.println(" Closing soon (< 30 min): " + closingSoon);
} catch (Exception e) {
Console.println(" ⚠️ Could not retrieve status: " + e.getMessage());
}
Console.println();
}
/**
* Gracefully shuts down all workflows
*/
public void shutdown() {
Console.println("\n🛑 Shutting down workflows...");
isRunning = false;
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
Console.println("✓ Workflows shut down successfully\n");
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}