go
This commit is contained in:
@@ -68,10 +68,9 @@ public class AuctionMonitorProducer {
|
||||
@Singleton
|
||||
public ImageProcessingService produceImageProcessingService(
|
||||
DatabaseService db,
|
||||
ObjectDetectionService detector,
|
||||
RateLimitedHttpClient httpClient) {
|
||||
ObjectDetectionService detector) {
|
||||
|
||||
LOG.infof("Initializing ImageProcessingService");
|
||||
return new ImageProcessingService(db, detector, httpClient);
|
||||
return new ImageProcessingService(db, detector);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +73,8 @@ public class DatabaseService {
|
||||
FOREIGN KEY (sale_id) REFERENCES auctions(auction_id)
|
||||
)""");
|
||||
|
||||
// Images table (populated by this process)
|
||||
// Images table (populated by external scraper with URLs and local_path)
|
||||
// This process only adds labels via object detection
|
||||
stmt.execute("""
|
||||
CREATE TABLE IF NOT EXISTS images (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
@@ -82,6 +83,7 @@ public class DatabaseService {
|
||||
local_path TEXT,
|
||||
labels TEXT,
|
||||
processed_at INTEGER,
|
||||
downloaded INTEGER DEFAULT 0,
|
||||
FOREIGN KEY (lot_id) REFERENCES lots(lot_id)
|
||||
)""");
|
||||
|
||||
@@ -258,6 +260,7 @@ public class DatabaseService {
|
||||
var hasLabels = false;
|
||||
var hasLocalPath = false;
|
||||
var hasProcessedAt = false;
|
||||
var hasDownloaded = false;
|
||||
|
||||
while (rs.next()) {
|
||||
var colName = rs.getString("name");
|
||||
@@ -265,6 +268,7 @@ public class DatabaseService {
|
||||
case "labels" -> hasLabels = true;
|
||||
case "local_path" -> hasLocalPath = true;
|
||||
case "processed_at" -> hasProcessedAt = true;
|
||||
case "downloaded" -> hasDownloaded = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -280,6 +284,10 @@ public class DatabaseService {
|
||||
log.info("Migrating schema: Adding 'processed_at' column to images table");
|
||||
stmt.execute("ALTER TABLE images ADD COLUMN processed_at INTEGER");
|
||||
}
|
||||
if (!hasDownloaded) {
|
||||
log.info("Migrating schema: Adding 'downloaded' column to images table");
|
||||
stmt.execute("ALTER TABLE images ADD COLUMN downloaded INTEGER DEFAULT 0");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
// Table might not exist yet, which is fine
|
||||
log.debug("Could not check images table schema: " + e.getMessage());
|
||||
@@ -462,19 +470,35 @@ public class DatabaseService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts a new image record with object detection labels
|
||||
* Updates the labels field for an image after object detection
|
||||
*/
|
||||
synchronized void insertImage(long lotId, String url, String filePath, List<String> labels) throws SQLException {
|
||||
var sql = "INSERT INTO images (lot_id, url, local_path, labels, processed_at) VALUES (?, ?, ?, ?, ?)";
|
||||
synchronized void updateImageLabels(int imageId, List<String> labels) throws SQLException {
|
||||
var sql = "UPDATE images SET labels = ?, processed_at = ? WHERE id = ?";
|
||||
try (var conn = DriverManager.getConnection(this.url); var ps = conn.prepareStatement(sql)) {
|
||||
ps.setLong(1, lotId);
|
||||
ps.setString(2, url);
|
||||
ps.setString(3, filePath);
|
||||
ps.setString(4, String.join(",", labels));
|
||||
ps.setLong(5, Instant.now().getEpochSecond());
|
||||
ps.setString(1, String.join(",", labels));
|
||||
ps.setLong(2, Instant.now().getEpochSecond());
|
||||
ps.setInt(3, imageId);
|
||||
ps.executeUpdate();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the labels for a specific image
|
||||
*/
|
||||
synchronized List<String> getImageLabels(int imageId) throws SQLException {
|
||||
var sql = "SELECT labels FROM images WHERE id = ?";
|
||||
try (var conn = DriverManager.getConnection(this.url); var ps = conn.prepareStatement(sql)) {
|
||||
ps.setInt(1, imageId);
|
||||
var rs = ps.executeQuery();
|
||||
if (rs.next()) {
|
||||
var labelsStr = rs.getString("labels");
|
||||
if (labelsStr != null && !labelsStr.isEmpty()) {
|
||||
return List.of(labelsStr.split(","));
|
||||
}
|
||||
}
|
||||
}
|
||||
return List.of();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves images for a specific lot
|
||||
@@ -671,46 +695,34 @@ public class DatabaseService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Imports image URLs from scraper's schema.
|
||||
* The scraper populates the images table with URLs but doesn't download them.
|
||||
* This method retrieves undownloaded images for processing.
|
||||
* Gets images that have been downloaded by the scraper but need object detection.
|
||||
* Only returns images that have local_path set but no labels yet.
|
||||
*
|
||||
* @return List of image URLs that need to be downloaded
|
||||
* @return List of images needing object detection
|
||||
*/
|
||||
synchronized List<ImageImportRecord> getUnprocessedImagesFromScraper() throws SQLException {
|
||||
List<ImageImportRecord> images = new ArrayList<>();
|
||||
synchronized List<ImageDetectionRecord> getImagesNeedingDetection() throws SQLException {
|
||||
List<ImageDetectionRecord> images = new ArrayList<>();
|
||||
var sql = """
|
||||
SELECT i.lot_id, i.url, l.auction_id
|
||||
SELECT i.id, i.lot_id, i.local_path
|
||||
FROM images i
|
||||
LEFT JOIN lots l ON i.lot_id = l.lot_id
|
||||
WHERE i.downloaded = 0 OR i.local_path IS NULL
|
||||
WHERE i.local_path IS NOT NULL
|
||||
AND i.local_path != ''
|
||||
AND (i.labels IS NULL OR i.labels = '')
|
||||
""";
|
||||
|
||||
|
||||
try (var conn = DriverManager.getConnection(url); var stmt = conn.createStatement()) {
|
||||
var rs = stmt.executeQuery(sql);
|
||||
while (rs.next()) {
|
||||
var lotIdStr = rs.getString("lot_id");
|
||||
var auctionIdStr = rs.getString("auction_id");
|
||||
|
||||
var lotId = ScraperDataAdapter.extractNumericId(lotIdStr);
|
||||
var saleId = ScraperDataAdapter.extractNumericId(auctionIdStr);
|
||||
|
||||
// Skip images with invalid IDs (0 indicates parsing failed)
|
||||
if (lotId == 0L || saleId == 0L) {
|
||||
log.debug("Skipping image with invalid ID: lot_id={}, sale_id={}", lotId, saleId);
|
||||
continue;
|
||||
}
|
||||
|
||||
images.add(new ImageImportRecord(
|
||||
lotId,
|
||||
saleId,
|
||||
rs.getString("url")
|
||||
images.add(new ImageDetectionRecord(
|
||||
rs.getInt("id"),
|
||||
rs.getLong("lot_id"),
|
||||
rs.getString("local_path")
|
||||
));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
log.info("ℹ️ No unprocessed images found in scraper format");
|
||||
log.info("ℹ️ No images needing detection found");
|
||||
}
|
||||
|
||||
|
||||
return images;
|
||||
}
|
||||
|
||||
@@ -718,9 +730,9 @@ public class DatabaseService {
|
||||
* Simple record for image data from database
|
||||
*/
|
||||
record ImageRecord(int id, long lotId, String url, String filePath, String labels) { }
|
||||
|
||||
|
||||
/**
|
||||
* Record for importing images from scraper format
|
||||
* Record for images that need object detection processing
|
||||
*/
|
||||
record ImageImportRecord(long lotId, long saleId, String url) { }
|
||||
record ImageDetectionRecord(int id, long lotId, String filePath) { }
|
||||
}
|
||||
|
||||
@@ -12,120 +12,78 @@ import java.util.List;
|
||||
|
||||
/**
|
||||
* Service responsible for processing images from the IMAGES table.
|
||||
* Downloads images, performs object detection, and updates the database.
|
||||
* Performs object detection on already-downloaded images and updates the database.
|
||||
*
|
||||
* This separates image processing concerns from scraping, allowing this project
|
||||
* to focus on enriching data scraped by the external process.
|
||||
* NOTE: Image downloading is handled by the external scraper process.
|
||||
* This service only performs object detection on images that already have local_path set.
|
||||
*/
|
||||
@Slf4j
|
||||
class ImageProcessingService {
|
||||
|
||||
private final RateLimitedHttpClient httpClient;
|
||||
private final DatabaseService db;
|
||||
private final ObjectDetectionService detector;
|
||||
|
||||
ImageProcessingService(DatabaseService db, ObjectDetectionService detector, RateLimitedHttpClient httpClient) {
|
||||
this.httpClient = httpClient;
|
||||
ImageProcessingService(DatabaseService db, ObjectDetectionService detector) {
|
||||
this.db = db;
|
||||
this.detector = detector;
|
||||
}
|
||||
|
||||
/**
|
||||
* Downloads an image from the given URL to local storage.
|
||||
* Images are organized by saleId/lotId for easy management.
|
||||
*
|
||||
* @param imageUrl remote image URL
|
||||
* @param saleId sale identifier
|
||||
* @param lotId lot identifier
|
||||
* @return absolute path to saved file or null on failure
|
||||
*/
|
||||
String downloadImage(String imageUrl, long saleId, long lotId) {
|
||||
try {
|
||||
var response = httpClient.sendGetBytes(imageUrl);
|
||||
|
||||
if (response != null && response.statusCode() == 200) {
|
||||
// Use environment variable for cross-platform compatibility
|
||||
var imagesPath = System.getenv().getOrDefault("AUCTION_IMAGES_PATH", "/mnt/okcomputer/output/images");
|
||||
var baseDir = Paths.get(imagesPath);
|
||||
var dir = baseDir.resolve(String.valueOf(saleId)).resolve(String.valueOf(lotId));
|
||||
Files.createDirectories(dir);
|
||||
|
||||
// Extract filename from URL
|
||||
var fileName = imageUrl.substring(imageUrl.lastIndexOf('/') + 1);
|
||||
// Remove query parameters if present
|
||||
int queryIndex = fileName.indexOf('?');
|
||||
if (queryIndex > 0) {
|
||||
fileName = fileName.substring(0, queryIndex);
|
||||
}
|
||||
var dest = dir.resolve(fileName);
|
||||
|
||||
Files.write(dest, response.body());
|
||||
return dest.toAbsolutePath().toString();
|
||||
}
|
||||
} catch (IOException | InterruptedException e) {
|
||||
System.err.println("Failed to download image " + imageUrl + ": " + e.getMessage());
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes images for a specific lot: downloads and runs object detection.
|
||||
* Processes a single image: runs object detection and updates labels in database.
|
||||
*
|
||||
* @param lotId lot identifier
|
||||
* @param saleId sale identifier
|
||||
* @param imageUrls list of image URLs to process
|
||||
* @param imageId database ID of the image record
|
||||
* @param localPath local file path to the downloaded image
|
||||
* @param lotId lot identifier (for logging)
|
||||
* @return true if processing succeeded
|
||||
*/
|
||||
void processImagesForLot(long lotId, long saleId, List<String> imageUrls) {
|
||||
log.info(" Processing {} images for lot {}", imageUrls.size(), lotId);
|
||||
|
||||
for (var imgUrl : imageUrls) {
|
||||
var fileName = downloadImage(imgUrl, saleId, lotId);
|
||||
|
||||
if (fileName != null) {
|
||||
// Run object detection
|
||||
var labels = detector.detectObjects(fileName);
|
||||
|
||||
// Save to database
|
||||
try {
|
||||
db.insertImage(lotId, imgUrl, fileName, labels);
|
||||
|
||||
if (!labels.isEmpty()) {
|
||||
log.info(" Detected: {}", String.join(", ", labels));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
System.err.println(" Failed to save image to database: " + e.getMessage());
|
||||
}
|
||||
boolean processImage(int imageId, String localPath, long lotId) {
|
||||
try {
|
||||
// Run object detection on the local file
|
||||
var labels = detector.detectObjects(localPath);
|
||||
|
||||
// Update the database with detected labels
|
||||
db.updateImageLabels(imageId, labels);
|
||||
|
||||
if (!labels.isEmpty()) {
|
||||
log.info(" Lot {}: Detected {}", lotId, String.join(", ", labels));
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error(" Failed to process image {}: {}", imageId, e.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch processes all pending images in the database.
|
||||
* Useful for processing images after the external scraper has populated lot data.
|
||||
* Only processes images that have been downloaded by the scraper but haven't had object detection run yet.
|
||||
*/
|
||||
void processPendingImages() {
|
||||
log.info("Processing pending images...");
|
||||
|
||||
|
||||
try {
|
||||
var lots = db.getAllLots();
|
||||
log.info("Found {} lots to check for images", lots.size());
|
||||
|
||||
for (var lot : lots) {
|
||||
// Check if images already processed for this lot
|
||||
var existingImages = db.getImagesForLot(lot.lotId());
|
||||
|
||||
if (existingImages.isEmpty()) {
|
||||
log.info(" Lot {} has no images yet - needs external scraper data", lot.lotId());
|
||||
var pendingImages = db.getImagesNeedingDetection();
|
||||
log.info("Found {} images needing object detection", pendingImages.size());
|
||||
|
||||
var processed = 0;
|
||||
var detected = 0;
|
||||
|
||||
for (var image : pendingImages) {
|
||||
if (processImage(image.id(), image.filePath(), image.lotId())) {
|
||||
processed++;
|
||||
// Re-fetch to check if labels were found
|
||||
var labels = db.getImageLabels(image.id());
|
||||
if (labels != null && !labels.isEmpty()) {
|
||||
detected++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
log.info("Processed {} images, detected objects in {}", processed, detected);
|
||||
|
||||
} catch (SQLException e) {
|
||||
System.err.println("Error processing pending images: " + e.getMessage());
|
||||
log.error("Error processing pending images: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,9 +54,9 @@ public class QuarkusWorkflowScheduler {
|
||||
var lots = db.importLotsFromScraper();
|
||||
LOG.infof(" → Imported %d lots", lots.size());
|
||||
|
||||
// Import image URLs
|
||||
var images = db.getUnprocessedImagesFromScraper();
|
||||
LOG.infof(" → Found %d unprocessed images", images.size());
|
||||
// Check for images needing detection
|
||||
var images = db.getImagesNeedingDetection();
|
||||
LOG.infof(" → Found %d images needing detection", images.size());
|
||||
|
||||
var duration = System.currentTimeMillis() - start;
|
||||
LOG.infof(" ✓ Scraper import completed in %dms", duration);
|
||||
@@ -78,63 +78,53 @@ public class QuarkusWorkflowScheduler {
|
||||
/**
|
||||
* Workflow 2: Process Pending Images
|
||||
* Cron: Every 1 hour (0 0 * * * ?)
|
||||
* Purpose: Download images and run object detection
|
||||
* Purpose: Run object detection on images already downloaded by scraper
|
||||
*/
|
||||
@Scheduled(cron = "{auction.workflow.image-processing.cron}", identity = "image-processing")
|
||||
void processImages() {
|
||||
try {
|
||||
LOG.info("🖼️ [WORKFLOW 2] Processing pending images...");
|
||||
var start = System.currentTimeMillis();
|
||||
|
||||
// Get unprocessed images
|
||||
var unprocessedImages = db.getUnprocessedImagesFromScraper();
|
||||
|
||||
if (unprocessedImages.isEmpty()) {
|
||||
|
||||
// Get images that have been downloaded but need object detection
|
||||
var pendingImages = db.getImagesNeedingDetection();
|
||||
|
||||
if (pendingImages.isEmpty()) {
|
||||
LOG.info(" → No pending images to process");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.infof(" → Processing %d images", unprocessedImages.size());
|
||||
|
||||
|
||||
LOG.infof(" → Processing %d images", pendingImages.size());
|
||||
|
||||
var processed = 0;
|
||||
var detected = 0;
|
||||
|
||||
for (var imageRecord : unprocessedImages) {
|
||||
|
||||
for (var image : pendingImages) {
|
||||
try {
|
||||
// Download image
|
||||
var filePath = imageProcessor.downloadImage(
|
||||
imageRecord.url(),
|
||||
imageRecord.saleId(),
|
||||
imageRecord.lotId()
|
||||
);
|
||||
|
||||
if (filePath != null) {
|
||||
// Run object detection
|
||||
var labels = detector.detectObjects(filePath);
|
||||
|
||||
// Save to database
|
||||
db.insertImage(imageRecord.lotId(), imageRecord.url(),
|
||||
filePath, labels);
|
||||
|
||||
// Run object detection on already-downloaded image
|
||||
if (imageProcessor.processImage(image.id(), image.filePath(), image.lotId())) {
|
||||
processed++;
|
||||
if (!labels.isEmpty()) {
|
||||
|
||||
// Check if objects were detected
|
||||
var labels = db.getImageLabels(image.id());
|
||||
if (labels != null && !labels.isEmpty()) {
|
||||
detected++;
|
||||
|
||||
|
||||
// Send notification for interesting detections
|
||||
if (labels.size() >= 3) {
|
||||
notifier.sendNotification(
|
||||
String.format("Lot %d: Detected %s",
|
||||
imageRecord.lotId(),
|
||||
image.lotId(),
|
||||
String.join(", ", labels)),
|
||||
"Objects Detected",
|
||||
0
|
||||
);
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rate limiting
|
||||
Thread.sleep(500);
|
||||
|
||||
// Rate limiting (lighter since no network I/O)
|
||||
Thread.sleep(100);
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.warnf(" ⚠️ Failed to process image: %s", e.getMessage());
|
||||
|
||||
@@ -43,7 +43,7 @@ public class TroostwijkMonitor {
|
||||
db = new DatabaseService(databasePath);
|
||||
notifier = new NotificationService(notificationConfig);
|
||||
detector = new ObjectDetectionService(yoloCfgPath, yoloWeightsPath, classNamesPath);
|
||||
imageProcessor = new ImageProcessingService(db, detector, httpClient);
|
||||
imageProcessor = new ImageProcessingService(db, detector);
|
||||
|
||||
db.ensureSchema();
|
||||
}
|
||||
|
||||
@@ -42,8 +42,7 @@ public class WorkflowOrchestrator {
|
||||
|
||||
this.notifier = new NotificationService(notificationConfig);
|
||||
this.detector = new ObjectDetectionService(yoloCfg, yoloWeights, yoloClasses);
|
||||
var httpClient = new RateLimitedHttpClient();
|
||||
this.imageProcessor = new ImageProcessingService(db, detector, httpClient);
|
||||
this.imageProcessor = new ImageProcessingService(db, detector);
|
||||
|
||||
this.monitor = new TroostwijkMonitor(databasePath, notificationConfig,
|
||||
yoloCfg, yoloWeights, yoloClasses);
|
||||
@@ -100,9 +99,9 @@ public class WorkflowOrchestrator {
|
||||
var lots = db.importLotsFromScraper();
|
||||
log.info(" → Imported {} lots", lots.size());
|
||||
|
||||
// Import image URLs
|
||||
var images = db.getUnprocessedImagesFromScraper();
|
||||
log.info(" → Found {} unprocessed images", images.size());
|
||||
// Check for images needing detection
|
||||
var images = db.getImagesNeedingDetection();
|
||||
log.info(" → Found {} images needing detection", images.size());
|
||||
|
||||
var duration = System.currentTimeMillis() - start;
|
||||
log.info(" ✓ Scraper import completed in {}ms\n", duration);
|
||||
@@ -127,7 +126,7 @@ public class WorkflowOrchestrator {
|
||||
/**
|
||||
* Workflow 2: Process Pending Images
|
||||
* Frequency: Every 1 hour
|
||||
* Purpose: Download images and run object detection
|
||||
* Purpose: Run object detection on images already downloaded by scraper
|
||||
*/
|
||||
private void scheduleImageProcessing() {
|
||||
scheduler.scheduleAtFixedRate(() -> {
|
||||
@@ -135,55 +134,45 @@ public class WorkflowOrchestrator {
|
||||
log.info("🖼️ [WORKFLOW 2] Processing pending images...");
|
||||
var start = System.currentTimeMillis();
|
||||
|
||||
// Get unprocessed images
|
||||
var unprocessedImages = db.getUnprocessedImagesFromScraper();
|
||||
|
||||
if (unprocessedImages.isEmpty()) {
|
||||
// Get images that have been downloaded but need object detection
|
||||
var pendingImages = db.getImagesNeedingDetection();
|
||||
|
||||
if (pendingImages.isEmpty()) {
|
||||
log.info(" → No pending images to process\n");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(" → Processing {} images", unprocessedImages.size());
|
||||
|
||||
|
||||
log.info(" → Processing {} images", pendingImages.size());
|
||||
|
||||
var processed = 0;
|
||||
var detected = 0;
|
||||
|
||||
for (var imageRecord : unprocessedImages) {
|
||||
|
||||
for (var image : pendingImages) {
|
||||
try {
|
||||
// Download image
|
||||
var filePath = imageProcessor.downloadImage(
|
||||
imageRecord.url(),
|
||||
imageRecord.saleId(),
|
||||
imageRecord.lotId()
|
||||
);
|
||||
|
||||
if (filePath != null) {
|
||||
// Run object detection
|
||||
var labels = detector.detectObjects(filePath);
|
||||
|
||||
// Save to database
|
||||
db.insertImage(imageRecord.lotId(), imageRecord.url(),
|
||||
filePath, labels);
|
||||
|
||||
// Run object detection on already-downloaded image
|
||||
if (imageProcessor.processImage(image.id(), image.filePath(), image.lotId())) {
|
||||
processed++;
|
||||
if (!labels.isEmpty()) {
|
||||
|
||||
// Check if objects were detected
|
||||
var labels = db.getImageLabels(image.id());
|
||||
if (labels != null && !labels.isEmpty()) {
|
||||
detected++;
|
||||
|
||||
|
||||
// Send notification for interesting detections
|
||||
if (labels.size() >= 3) {
|
||||
notifier.sendNotification(
|
||||
String.format("Lot %d: Detected %s",
|
||||
imageRecord.lotId(),
|
||||
image.lotId(),
|
||||
String.join(", ", labels)),
|
||||
"Objects Detected",
|
||||
0
|
||||
);
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rate limiting
|
||||
Thread.sleep(500);
|
||||
|
||||
// Rate limiting (lighter since no network I/O)
|
||||
Thread.sleep(100);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.info(" ⚠\uFE0F Failed to process image: {}", e.getMessage());
|
||||
|
||||
Reference in New Issue
Block a user