Former-commit-id: 6091b7180f
This commit is contained in:
Tour
2025-12-06 21:27:19 +01:00
parent 49cb4f94aa
commit 9baaca9013
9 changed files with 551 additions and 204 deletions

View File

@@ -68,10 +68,9 @@ public class AuctionMonitorProducer {
@Singleton
public ImageProcessingService produceImageProcessingService(
DatabaseService db,
ObjectDetectionService detector,
RateLimitedHttpClient httpClient) {
ObjectDetectionService detector) {
LOG.infof("Initializing ImageProcessingService");
return new ImageProcessingService(db, detector, httpClient);
return new ImageProcessingService(db, detector);
}
}

View File

@@ -73,7 +73,8 @@ public class DatabaseService {
FOREIGN KEY (sale_id) REFERENCES auctions(auction_id)
)""");
// Images table (populated by this process)
// Images table (populated by external scraper with URLs and local_path)
// This process only adds labels via object detection
stmt.execute("""
CREATE TABLE IF NOT EXISTS images (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -82,6 +83,7 @@ public class DatabaseService {
local_path TEXT,
labels TEXT,
processed_at INTEGER,
downloaded INTEGER DEFAULT 0,
FOREIGN KEY (lot_id) REFERENCES lots(lot_id)
)""");
@@ -258,6 +260,7 @@ public class DatabaseService {
var hasLabels = false;
var hasLocalPath = false;
var hasProcessedAt = false;
var hasDownloaded = false;
while (rs.next()) {
var colName = rs.getString("name");
@@ -265,6 +268,7 @@ public class DatabaseService {
case "labels" -> hasLabels = true;
case "local_path" -> hasLocalPath = true;
case "processed_at" -> hasProcessedAt = true;
case "downloaded" -> hasDownloaded = true;
}
}
@@ -280,6 +284,10 @@ public class DatabaseService {
log.info("Migrating schema: Adding 'processed_at' column to images table");
stmt.execute("ALTER TABLE images ADD COLUMN processed_at INTEGER");
}
if (!hasDownloaded) {
log.info("Migrating schema: Adding 'downloaded' column to images table");
stmt.execute("ALTER TABLE images ADD COLUMN downloaded INTEGER DEFAULT 0");
}
} catch (SQLException e) {
// Table might not exist yet, which is fine
log.debug("Could not check images table schema: " + e.getMessage());
@@ -462,19 +470,35 @@ public class DatabaseService {
}
/**
* Inserts a new image record with object detection labels
* Updates the labels field for an image after object detection
*/
synchronized void insertImage(long lotId, String url, String filePath, List<String> labels) throws SQLException {
var sql = "INSERT INTO images (lot_id, url, local_path, labels, processed_at) VALUES (?, ?, ?, ?, ?)";
synchronized void updateImageLabels(int imageId, List<String> labels) throws SQLException {
var sql = "UPDATE images SET labels = ?, processed_at = ? WHERE id = ?";
try (var conn = DriverManager.getConnection(this.url); var ps = conn.prepareStatement(sql)) {
ps.setLong(1, lotId);
ps.setString(2, url);
ps.setString(3, filePath);
ps.setString(4, String.join(",", labels));
ps.setLong(5, Instant.now().getEpochSecond());
ps.setString(1, String.join(",", labels));
ps.setLong(2, Instant.now().getEpochSecond());
ps.setInt(3, imageId);
ps.executeUpdate();
}
}
/**
* Gets the labels for a specific image
*/
synchronized List<String> getImageLabels(int imageId) throws SQLException {
var sql = "SELECT labels FROM images WHERE id = ?";
try (var conn = DriverManager.getConnection(this.url); var ps = conn.prepareStatement(sql)) {
ps.setInt(1, imageId);
var rs = ps.executeQuery();
if (rs.next()) {
var labelsStr = rs.getString("labels");
if (labelsStr != null && !labelsStr.isEmpty()) {
return List.of(labelsStr.split(","));
}
}
}
return List.of();
}
/**
* Retrieves images for a specific lot
@@ -671,46 +695,34 @@ public class DatabaseService {
}
/**
* Imports image URLs from scraper's schema.
* The scraper populates the images table with URLs but doesn't download them.
* This method retrieves undownloaded images for processing.
* Gets images that have been downloaded by the scraper but need object detection.
* Only returns images that have local_path set but no labels yet.
*
* @return List of image URLs that need to be downloaded
* @return List of images needing object detection
*/
synchronized List<ImageImportRecord> getUnprocessedImagesFromScraper() throws SQLException {
List<ImageImportRecord> images = new ArrayList<>();
synchronized List<ImageDetectionRecord> getImagesNeedingDetection() throws SQLException {
List<ImageDetectionRecord> images = new ArrayList<>();
var sql = """
SELECT i.lot_id, i.url, l.auction_id
SELECT i.id, i.lot_id, i.local_path
FROM images i
LEFT JOIN lots l ON i.lot_id = l.lot_id
WHERE i.downloaded = 0 OR i.local_path IS NULL
WHERE i.local_path IS NOT NULL
AND i.local_path != ''
AND (i.labels IS NULL OR i.labels = '')
""";
try (var conn = DriverManager.getConnection(url); var stmt = conn.createStatement()) {
var rs = stmt.executeQuery(sql);
while (rs.next()) {
var lotIdStr = rs.getString("lot_id");
var auctionIdStr = rs.getString("auction_id");
var lotId = ScraperDataAdapter.extractNumericId(lotIdStr);
var saleId = ScraperDataAdapter.extractNumericId(auctionIdStr);
// Skip images with invalid IDs (0 indicates parsing failed)
if (lotId == 0L || saleId == 0L) {
log.debug("Skipping image with invalid ID: lot_id={}, sale_id={}", lotId, saleId);
continue;
}
images.add(new ImageImportRecord(
lotId,
saleId,
rs.getString("url")
images.add(new ImageDetectionRecord(
rs.getInt("id"),
rs.getLong("lot_id"),
rs.getString("local_path")
));
}
} catch (SQLException e) {
log.info(" No unprocessed images found in scraper format");
log.info(" No images needing detection found");
}
return images;
}
@@ -718,9 +730,9 @@ public class DatabaseService {
* Simple record for image data from database
*/
record ImageRecord(int id, long lotId, String url, String filePath, String labels) { }
/**
* Record for importing images from scraper format
* Record for images that need object detection processing
*/
record ImageImportRecord(long lotId, long saleId, String url) { }
record ImageDetectionRecord(int id, long lotId, String filePath) { }
}

View File

@@ -12,120 +12,78 @@ import java.util.List;
/**
* Service responsible for processing images from the IMAGES table.
* Downloads images, performs object detection, and updates the database.
* Performs object detection on already-downloaded images and updates the database.
*
* This separates image processing concerns from scraping, allowing this project
* to focus on enriching data scraped by the external process.
* NOTE: Image downloading is handled by the external scraper process.
* This service only performs object detection on images that already have local_path set.
*/
@Slf4j
class ImageProcessingService {
private final RateLimitedHttpClient httpClient;
private final DatabaseService db;
private final ObjectDetectionService detector;
ImageProcessingService(DatabaseService db, ObjectDetectionService detector, RateLimitedHttpClient httpClient) {
this.httpClient = httpClient;
ImageProcessingService(DatabaseService db, ObjectDetectionService detector) {
this.db = db;
this.detector = detector;
}
/**
* Downloads an image from the given URL to local storage.
* Images are organized by saleId/lotId for easy management.
*
* @param imageUrl remote image URL
* @param saleId sale identifier
* @param lotId lot identifier
* @return absolute path to saved file or null on failure
*/
String downloadImage(String imageUrl, long saleId, long lotId) {
try {
var response = httpClient.sendGetBytes(imageUrl);
if (response != null && response.statusCode() == 200) {
// Use environment variable for cross-platform compatibility
var imagesPath = System.getenv().getOrDefault("AUCTION_IMAGES_PATH", "/mnt/okcomputer/output/images");
var baseDir = Paths.get(imagesPath);
var dir = baseDir.resolve(String.valueOf(saleId)).resolve(String.valueOf(lotId));
Files.createDirectories(dir);
// Extract filename from URL
var fileName = imageUrl.substring(imageUrl.lastIndexOf('/') + 1);
// Remove query parameters if present
int queryIndex = fileName.indexOf('?');
if (queryIndex > 0) {
fileName = fileName.substring(0, queryIndex);
}
var dest = dir.resolve(fileName);
Files.write(dest, response.body());
return dest.toAbsolutePath().toString();
}
} catch (IOException | InterruptedException e) {
System.err.println("Failed to download image " + imageUrl + ": " + e.getMessage());
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
/**
* Processes images for a specific lot: downloads and runs object detection.
* Processes a single image: runs object detection and updates labels in database.
*
* @param lotId lot identifier
* @param saleId sale identifier
* @param imageUrls list of image URLs to process
* @param imageId database ID of the image record
* @param localPath local file path to the downloaded image
* @param lotId lot identifier (for logging)
* @return true if processing succeeded
*/
void processImagesForLot(long lotId, long saleId, List<String> imageUrls) {
log.info(" Processing {} images for lot {}", imageUrls.size(), lotId);
for (var imgUrl : imageUrls) {
var fileName = downloadImage(imgUrl, saleId, lotId);
if (fileName != null) {
// Run object detection
var labels = detector.detectObjects(fileName);
// Save to database
try {
db.insertImage(lotId, imgUrl, fileName, labels);
if (!labels.isEmpty()) {
log.info(" Detected: {}", String.join(", ", labels));
}
} catch (SQLException e) {
System.err.println(" Failed to save image to database: " + e.getMessage());
}
boolean processImage(int imageId, String localPath, long lotId) {
try {
// Run object detection on the local file
var labels = detector.detectObjects(localPath);
// Update the database with detected labels
db.updateImageLabels(imageId, labels);
if (!labels.isEmpty()) {
log.info(" Lot {}: Detected {}", lotId, String.join(", ", labels));
}
return true;
} catch (Exception e) {
log.error(" Failed to process image {}: {}", imageId, e.getMessage());
return false;
}
}
/**
* Batch processes all pending images in the database.
* Useful for processing images after the external scraper has populated lot data.
* Only processes images that have been downloaded by the scraper but haven't had object detection run yet.
*/
void processPendingImages() {
log.info("Processing pending images...");
try {
var lots = db.getAllLots();
log.info("Found {} lots to check for images", lots.size());
for (var lot : lots) {
// Check if images already processed for this lot
var existingImages = db.getImagesForLot(lot.lotId());
if (existingImages.isEmpty()) {
log.info(" Lot {} has no images yet - needs external scraper data", lot.lotId());
var pendingImages = db.getImagesNeedingDetection();
log.info("Found {} images needing object detection", pendingImages.size());
var processed = 0;
var detected = 0;
for (var image : pendingImages) {
if (processImage(image.id(), image.filePath(), image.lotId())) {
processed++;
// Re-fetch to check if labels were found
var labels = db.getImageLabels(image.id());
if (labels != null && !labels.isEmpty()) {
detected++;
}
}
}
log.info("Processed {} images, detected objects in {}", processed, detected);
} catch (SQLException e) {
System.err.println("Error processing pending images: " + e.getMessage());
log.error("Error processing pending images: {}", e.getMessage());
}
}
}

View File

@@ -54,9 +54,9 @@ public class QuarkusWorkflowScheduler {
var lots = db.importLotsFromScraper();
LOG.infof(" → Imported %d lots", lots.size());
// Import image URLs
var images = db.getUnprocessedImagesFromScraper();
LOG.infof(" → Found %d unprocessed images", images.size());
// Check for images needing detection
var images = db.getImagesNeedingDetection();
LOG.infof(" → Found %d images needing detection", images.size());
var duration = System.currentTimeMillis() - start;
LOG.infof(" ✓ Scraper import completed in %dms", duration);
@@ -78,63 +78,53 @@ public class QuarkusWorkflowScheduler {
/**
* Workflow 2: Process Pending Images
* Cron: Every 1 hour (0 0 * * * ?)
* Purpose: Download images and run object detection
* Purpose: Run object detection on images already downloaded by scraper
*/
@Scheduled(cron = "{auction.workflow.image-processing.cron}", identity = "image-processing")
void processImages() {
try {
LOG.info("🖼️ [WORKFLOW 2] Processing pending images...");
var start = System.currentTimeMillis();
// Get unprocessed images
var unprocessedImages = db.getUnprocessedImagesFromScraper();
if (unprocessedImages.isEmpty()) {
// Get images that have been downloaded but need object detection
var pendingImages = db.getImagesNeedingDetection();
if (pendingImages.isEmpty()) {
LOG.info(" → No pending images to process");
return;
}
LOG.infof(" → Processing %d images", unprocessedImages.size());
LOG.infof(" → Processing %d images", pendingImages.size());
var processed = 0;
var detected = 0;
for (var imageRecord : unprocessedImages) {
for (var image : pendingImages) {
try {
// Download image
var filePath = imageProcessor.downloadImage(
imageRecord.url(),
imageRecord.saleId(),
imageRecord.lotId()
);
if (filePath != null) {
// Run object detection
var labels = detector.detectObjects(filePath);
// Save to database
db.insertImage(imageRecord.lotId(), imageRecord.url(),
filePath, labels);
// Run object detection on already-downloaded image
if (imageProcessor.processImage(image.id(), image.filePath(), image.lotId())) {
processed++;
if (!labels.isEmpty()) {
// Check if objects were detected
var labels = db.getImageLabels(image.id());
if (labels != null && !labels.isEmpty()) {
detected++;
// Send notification for interesting detections
if (labels.size() >= 3) {
notifier.sendNotification(
String.format("Lot %d: Detected %s",
imageRecord.lotId(),
image.lotId(),
String.join(", ", labels)),
"Objects Detected",
0
);
);
}
}
}
// Rate limiting
Thread.sleep(500);
// Rate limiting (lighter since no network I/O)
Thread.sleep(100);
} catch (Exception e) {
LOG.warnf(" ⚠️ Failed to process image: %s", e.getMessage());

View File

@@ -43,7 +43,7 @@ public class TroostwijkMonitor {
db = new DatabaseService(databasePath);
notifier = new NotificationService(notificationConfig);
detector = new ObjectDetectionService(yoloCfgPath, yoloWeightsPath, classNamesPath);
imageProcessor = new ImageProcessingService(db, detector, httpClient);
imageProcessor = new ImageProcessingService(db, detector);
db.ensureSchema();
}

View File

@@ -42,8 +42,7 @@ public class WorkflowOrchestrator {
this.notifier = new NotificationService(notificationConfig);
this.detector = new ObjectDetectionService(yoloCfg, yoloWeights, yoloClasses);
var httpClient = new RateLimitedHttpClient();
this.imageProcessor = new ImageProcessingService(db, detector, httpClient);
this.imageProcessor = new ImageProcessingService(db, detector);
this.monitor = new TroostwijkMonitor(databasePath, notificationConfig,
yoloCfg, yoloWeights, yoloClasses);
@@ -100,9 +99,9 @@ public class WorkflowOrchestrator {
var lots = db.importLotsFromScraper();
log.info(" → Imported {} lots", lots.size());
// Import image URLs
var images = db.getUnprocessedImagesFromScraper();
log.info(" → Found {} unprocessed images", images.size());
// Check for images needing detection
var images = db.getImagesNeedingDetection();
log.info(" → Found {} images needing detection", images.size());
var duration = System.currentTimeMillis() - start;
log.info(" ✓ Scraper import completed in {}ms\n", duration);
@@ -127,7 +126,7 @@ public class WorkflowOrchestrator {
/**
* Workflow 2: Process Pending Images
* Frequency: Every 1 hour
* Purpose: Download images and run object detection
* Purpose: Run object detection on images already downloaded by scraper
*/
private void scheduleImageProcessing() {
scheduler.scheduleAtFixedRate(() -> {
@@ -135,55 +134,45 @@ public class WorkflowOrchestrator {
log.info("🖼️ [WORKFLOW 2] Processing pending images...");
var start = System.currentTimeMillis();
// Get unprocessed images
var unprocessedImages = db.getUnprocessedImagesFromScraper();
if (unprocessedImages.isEmpty()) {
// Get images that have been downloaded but need object detection
var pendingImages = db.getImagesNeedingDetection();
if (pendingImages.isEmpty()) {
log.info(" → No pending images to process\n");
return;
}
log.info(" → Processing {} images", unprocessedImages.size());
log.info(" → Processing {} images", pendingImages.size());
var processed = 0;
var detected = 0;
for (var imageRecord : unprocessedImages) {
for (var image : pendingImages) {
try {
// Download image
var filePath = imageProcessor.downloadImage(
imageRecord.url(),
imageRecord.saleId(),
imageRecord.lotId()
);
if (filePath != null) {
// Run object detection
var labels = detector.detectObjects(filePath);
// Save to database
db.insertImage(imageRecord.lotId(), imageRecord.url(),
filePath, labels);
// Run object detection on already-downloaded image
if (imageProcessor.processImage(image.id(), image.filePath(), image.lotId())) {
processed++;
if (!labels.isEmpty()) {
// Check if objects were detected
var labels = db.getImageLabels(image.id());
if (labels != null && !labels.isEmpty()) {
detected++;
// Send notification for interesting detections
if (labels.size() >= 3) {
notifier.sendNotification(
String.format("Lot %d: Detected %s",
imageRecord.lotId(),
image.lotId(),
String.join(", ", labels)),
"Objects Detected",
0
);
);
}
}
}
// Rate limiting
Thread.sleep(500);
// Rate limiting (lighter since no network I/O)
Thread.sleep(100);
} catch (Exception e) {
log.info("\uFE0F Failed to process image: {}", e.getMessage());