USe ASM 9.8 with Java 25
This commit is contained in:
@@ -53,9 +53,10 @@ public class AuctionMonitorProducer {
|
||||
@Singleton
|
||||
public ImageProcessingService produceImageProcessingService(
|
||||
DatabaseService db,
|
||||
ObjectDetectionService detector) {
|
||||
ObjectDetectionService detector,
|
||||
RateLimitedHttpClient httpClient) {
|
||||
|
||||
LOG.infof("Initializing ImageProcessingService");
|
||||
return new ImageProcessingService(db, detector);
|
||||
return new ImageProcessingService(db, detector, httpClient);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,9 @@ public class AuctionMonitorResource {
|
||||
@Inject
|
||||
NotificationService notifier;
|
||||
|
||||
@Inject
|
||||
RateLimitedHttpClient httpClient;
|
||||
|
||||
/**
|
||||
* GET /api/monitor/status
|
||||
* Returns current monitoring status
|
||||
@@ -286,4 +289,75 @@ public class AuctionMonitorResource {
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* GET /api/monitor/rate-limit/stats
|
||||
* Returns HTTP rate limiting statistics for all hosts
|
||||
*/
|
||||
@GET
|
||||
@Path("/rate-limit/stats")
|
||||
public Response getRateLimitStats() {
|
||||
try {
|
||||
var stats = httpClient.getAllStats();
|
||||
|
||||
Map<String, Object> response = new HashMap<>();
|
||||
response.put("hosts", stats.size());
|
||||
|
||||
Map<String, Object> hostStats = new HashMap<>();
|
||||
for (var entry : stats.entrySet()) {
|
||||
var stat = entry.getValue();
|
||||
hostStats.put(entry.getKey(), Map.of(
|
||||
"totalRequests", stat.getTotalRequests(),
|
||||
"successfulRequests", stat.getSuccessfulRequests(),
|
||||
"failedRequests", stat.getFailedRequests(),
|
||||
"rateLimitedRequests", stat.getRateLimitedRequests(),
|
||||
"averageDurationMs", stat.getAverageDurationMs()
|
||||
));
|
||||
}
|
||||
response.put("statistics", hostStats);
|
||||
|
||||
return Response.ok(response).build();
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to get rate limit stats", e);
|
||||
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
|
||||
.entity(Map.of("error", e.getMessage()))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* GET /api/monitor/rate-limit/stats/{host}
|
||||
* Returns HTTP rate limiting statistics for a specific host
|
||||
*/
|
||||
@GET
|
||||
@Path("/rate-limit/stats/{host}")
|
||||
public Response getRateLimitStatsForHost(@PathParam("host") String host) {
|
||||
try {
|
||||
var stat = httpClient.getStats(host);
|
||||
|
||||
if (stat == null) {
|
||||
return Response.status(Response.Status.NOT_FOUND)
|
||||
.entity(Map.of("error", "No statistics found for host: " + host))
|
||||
.build();
|
||||
}
|
||||
|
||||
Map<String, Object> response = Map.of(
|
||||
"host", stat.getHost(),
|
||||
"totalRequests", stat.getTotalRequests(),
|
||||
"successfulRequests", stat.getSuccessfulRequests(),
|
||||
"failedRequests", stat.getFailedRequests(),
|
||||
"rateLimitedRequests", stat.getRateLimitedRequests(),
|
||||
"averageDurationMs", stat.getAverageDurationMs()
|
||||
);
|
||||
|
||||
return Response.ok(response).build();
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to get rate limit stats for host", e);
|
||||
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
|
||||
.entity(Map.of("error", e.getMessage()))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
package com.auction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.sql.SQLException;
|
||||
@@ -19,12 +15,12 @@ import java.util.List;
|
||||
*/
|
||||
class ImageProcessingService {
|
||||
|
||||
private final HttpClient httpClient;
|
||||
private final RateLimitedHttpClient httpClient;
|
||||
private final DatabaseService db;
|
||||
private final ObjectDetectionService detector;
|
||||
|
||||
ImageProcessingService(DatabaseService db, ObjectDetectionService detector) {
|
||||
this.httpClient = HttpClient.newHttpClient();
|
||||
ImageProcessingService(DatabaseService db, ObjectDetectionService detector, RateLimitedHttpClient httpClient) {
|
||||
this.httpClient = httpClient;
|
||||
this.db = db;
|
||||
this.detector = detector;
|
||||
}
|
||||
@@ -40,12 +36,7 @@ class ImageProcessingService {
|
||||
*/
|
||||
String downloadImage(String imageUrl, int saleId, int lotId) {
|
||||
try {
|
||||
var request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(imageUrl))
|
||||
.GET()
|
||||
.build();
|
||||
|
||||
var response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
|
||||
var response = httpClient.sendGetBytes(imageUrl);
|
||||
|
||||
if (response.statusCode() == 200) {
|
||||
// Use Windows path: C:\mnt\okcomputer\output\images
|
||||
@@ -56,7 +47,7 @@ class ImageProcessingService {
|
||||
var fileName = Paths.get(imageUrl).getFileName().toString();
|
||||
var dest = dir.resolve(fileName);
|
||||
|
||||
Files.copy(response.body(), dest);
|
||||
Files.write(dest, response.body());
|
||||
return dest.toAbsolutePath().toString();
|
||||
}
|
||||
} catch (IOException | InterruptedException e) {
|
||||
|
||||
270
src/main/java/com/auction/RateLimitedHttpClient.java
Normal file
270
src/main/java/com/auction/RateLimitedHttpClient.java
Normal file
@@ -0,0 +1,270 @@
|
||||
package com.auction;
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Rate-limited HTTP client that enforces per-host request limits.
|
||||
*
|
||||
* Features:
|
||||
* - Per-host rate limiting (configurable max requests per second)
|
||||
* - Request counting and monitoring
|
||||
* - Thread-safe using semaphores
|
||||
* - Automatic host extraction from URLs
|
||||
*
|
||||
* This prevents overloading external services like Troostwijk and getting blocked.
|
||||
*/
|
||||
@ApplicationScoped
|
||||
public class RateLimitedHttpClient {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(RateLimitedHttpClient.class);
|
||||
|
||||
private final HttpClient httpClient;
|
||||
private final Map<String, RateLimiter> rateLimiters;
|
||||
private final Map<String, RequestStats> requestStats;
|
||||
|
||||
@ConfigProperty(name = "auction.http.rate-limit.default-max-rps", defaultValue = "2")
|
||||
int defaultMaxRequestsPerSecond;
|
||||
|
||||
@ConfigProperty(name = "auction.http.rate-limit.troostwijk-max-rps", defaultValue = "1")
|
||||
int troostwijkMaxRequestsPerSecond;
|
||||
|
||||
@ConfigProperty(name = "auction.http.timeout-seconds", defaultValue = "30")
|
||||
int timeoutSeconds;
|
||||
|
||||
public RateLimitedHttpClient() {
|
||||
this.httpClient = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(30))
|
||||
.build();
|
||||
this.rateLimiters = new ConcurrentHashMap<>();
|
||||
this.requestStats = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a GET request with automatic rate limiting based on host.
|
||||
*/
|
||||
public HttpResponse<String> sendGet(String url) throws IOException, InterruptedException {
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(url))
|
||||
.timeout(Duration.ofSeconds(timeoutSeconds))
|
||||
.GET()
|
||||
.build();
|
||||
|
||||
return send(request, HttpResponse.BodyHandlers.ofString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a request for binary data (like images) with rate limiting.
|
||||
*/
|
||||
public HttpResponse<byte[]> sendGetBytes(String url) throws IOException, InterruptedException {
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(url))
|
||||
.timeout(Duration.ofSeconds(timeoutSeconds))
|
||||
.GET()
|
||||
.build();
|
||||
|
||||
return send(request, HttpResponse.BodyHandlers.ofByteArray());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends any HTTP request with automatic rate limiting.
|
||||
*/
|
||||
public <T> HttpResponse<T> send(HttpRequest request, HttpResponse.BodyHandler<T> bodyHandler)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
String host = extractHost(request.uri());
|
||||
RateLimiter limiter = getRateLimiter(host);
|
||||
RequestStats stats = getRequestStats(host);
|
||||
|
||||
// Enforce rate limit (blocks if necessary)
|
||||
limiter.acquire();
|
||||
|
||||
// Track request
|
||||
stats.incrementTotal();
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
HttpResponse<T> response = httpClient.send(request, bodyHandler);
|
||||
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
stats.recordSuccess(duration);
|
||||
|
||||
LOG.debugf("HTTP %d %s %s (%dms)",
|
||||
response.statusCode(), request.method(), host, duration);
|
||||
|
||||
// Track rate limit violations (429 = Too Many Requests)
|
||||
if (response.statusCode() == 429) {
|
||||
stats.incrementRateLimited();
|
||||
LOG.warnf("⚠️ Rate limited by %s (HTTP 429)", host);
|
||||
}
|
||||
|
||||
return response;
|
||||
|
||||
} catch (IOException | InterruptedException e) {
|
||||
stats.incrementFailed();
|
||||
LOG.warnf("❌ HTTP request failed for %s: %s", host, e.getMessage());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or creates a rate limiter for a specific host.
|
||||
*/
|
||||
private RateLimiter getRateLimiter(String host) {
|
||||
return rateLimiters.computeIfAbsent(host, h -> {
|
||||
int maxRps = getMaxRequestsPerSecond(h);
|
||||
LOG.infof("Initializing rate limiter for %s: %d req/s", h, maxRps);
|
||||
return new RateLimiter(maxRps);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or creates request stats for a specific host.
|
||||
*/
|
||||
private RequestStats getRequestStats(String host) {
|
||||
return requestStats.computeIfAbsent(host, h -> new RequestStats(h));
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines max requests per second for a given host.
|
||||
*/
|
||||
private int getMaxRequestsPerSecond(String host) {
|
||||
if (host.contains("troostwijk")) {
|
||||
return troostwijkMaxRequestsPerSecond;
|
||||
}
|
||||
return defaultMaxRequestsPerSecond;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts host from URI (e.g., "api.troostwijkauctions.com").
|
||||
*/
|
||||
private String extractHost(URI uri) {
|
||||
return uri.getHost() != null ? uri.getHost() : uri.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets statistics for all hosts.
|
||||
*/
|
||||
public Map<String, RequestStats> getAllStats() {
|
||||
return Map.copyOf(requestStats);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets statistics for a specific host.
|
||||
*/
|
||||
public RequestStats getStats(String host) {
|
||||
return requestStats.get(host);
|
||||
}
|
||||
|
||||
/**
|
||||
* Rate limiter implementation using token bucket algorithm.
|
||||
* Allows burst traffic up to maxRequestsPerSecond, then enforces steady rate.
|
||||
*/
|
||||
private static class RateLimiter {
|
||||
private final Semaphore semaphore;
|
||||
private final int maxRequestsPerSecond;
|
||||
private final long intervalNanos;
|
||||
|
||||
RateLimiter(int maxRequestsPerSecond) {
|
||||
this.maxRequestsPerSecond = maxRequestsPerSecond;
|
||||
this.intervalNanos = TimeUnit.SECONDS.toNanos(1) / maxRequestsPerSecond;
|
||||
this.semaphore = new Semaphore(maxRequestsPerSecond);
|
||||
|
||||
// Refill tokens periodically
|
||||
startRefillThread();
|
||||
}
|
||||
|
||||
void acquire() throws InterruptedException {
|
||||
semaphore.acquire();
|
||||
|
||||
// Enforce minimum delay between requests
|
||||
long delayMillis = intervalNanos / 1_000_000;
|
||||
if (delayMillis > 0) {
|
||||
Thread.sleep(delayMillis);
|
||||
}
|
||||
}
|
||||
|
||||
private void startRefillThread() {
|
||||
Thread refillThread = new Thread(() -> {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
Thread.sleep(1000); // Refill every second
|
||||
int toRelease = maxRequestsPerSecond - semaphore.availablePermits();
|
||||
if (toRelease > 0) {
|
||||
semaphore.release(toRelease);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}, "RateLimiter-Refill");
|
||||
refillThread.setDaemon(true);
|
||||
refillThread.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Statistics tracker for HTTP requests per host.
|
||||
*/
|
||||
public static class RequestStats {
|
||||
private final String host;
|
||||
private final AtomicLong totalRequests = new AtomicLong(0);
|
||||
private final AtomicLong successfulRequests = new AtomicLong(0);
|
||||
private final AtomicLong failedRequests = new AtomicLong(0);
|
||||
private final AtomicLong rateLimitedRequests = new AtomicLong(0);
|
||||
private final AtomicLong totalDurationMs = new AtomicLong(0);
|
||||
|
||||
RequestStats(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
void incrementTotal() {
|
||||
totalRequests.incrementAndGet();
|
||||
}
|
||||
|
||||
void recordSuccess(long durationMs) {
|
||||
successfulRequests.incrementAndGet();
|
||||
totalDurationMs.addAndGet(durationMs);
|
||||
}
|
||||
|
||||
void incrementFailed() {
|
||||
failedRequests.incrementAndGet();
|
||||
}
|
||||
|
||||
void incrementRateLimited() {
|
||||
rateLimitedRequests.incrementAndGet();
|
||||
}
|
||||
|
||||
// Getters
|
||||
public String getHost() { return host; }
|
||||
public long getTotalRequests() { return totalRequests.get(); }
|
||||
public long getSuccessfulRequests() { return successfulRequests.get(); }
|
||||
public long getFailedRequests() { return failedRequests.get(); }
|
||||
public long getRateLimitedRequests() { return rateLimitedRequests.get(); }
|
||||
public long getAverageDurationMs() {
|
||||
long successful = successfulRequests.get();
|
||||
return successful > 0 ? totalDurationMs.get() / successful : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s: %d total, %d success, %d failed, %d rate-limited, avg %dms",
|
||||
host, getTotalRequests(), getSuccessfulRequests(),
|
||||
getFailedRequests(), getRateLimitedRequests(), getAverageDurationMs());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,10 +2,6 @@ package com.auction;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.sql.SQLException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -23,7 +19,7 @@ public class TroostwijkMonitor {
|
||||
|
||||
private static final String LOT_API = "https://api.troostwijkauctions.com/lot/7/list";
|
||||
|
||||
private final HttpClient httpClient;
|
||||
private final RateLimitedHttpClient httpClient;
|
||||
private final ObjectMapper objectMapper;
|
||||
public final DatabaseService db;
|
||||
private final NotificationService notifier;
|
||||
@@ -42,12 +38,12 @@ public class TroostwijkMonitor {
|
||||
public TroostwijkMonitor(String databasePath, String notificationConfig,
|
||||
String yoloCfgPath, String yoloWeightsPath, String classNamesPath)
|
||||
throws SQLException, IOException {
|
||||
this.httpClient = HttpClient.newHttpClient();
|
||||
this.httpClient = new RateLimitedHttpClient();
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.db = new DatabaseService(databasePath);
|
||||
this.notifier = new NotificationService(notificationConfig, "");
|
||||
this.detector = new ObjectDetectionService(yoloCfgPath, yoloWeightsPath, classNamesPath);
|
||||
this.imageProcessor = new ImageProcessingService(db, detector);
|
||||
this.imageProcessor = new ImageProcessingService(db, detector, httpClient);
|
||||
|
||||
// Initialize database schema
|
||||
db.ensureSchema();
|
||||
@@ -110,8 +106,7 @@ public class TroostwijkMonitor {
|
||||
var url = LOT_API + "?batchSize=1&listType=7&offset=0&sortOption=0&saleID=" + lot.saleId()
|
||||
+ "&parentID=0&relationID=0&buildversion=201807311&lotID=" + lot.lotId();
|
||||
|
||||
var request = HttpRequest.newBuilder().uri(URI.create(url)).GET().build();
|
||||
var response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
|
||||
var response = httpClient.sendGet(url);
|
||||
|
||||
if (response.statusCode() != 200) return;
|
||||
|
||||
|
||||
@@ -41,7 +41,8 @@ public class WorkflowOrchestrator {
|
||||
|
||||
this.notifier = new NotificationService(notificationConfig, "");
|
||||
this.detector = new ObjectDetectionService(yoloCfg, yoloWeights, yoloClasses);
|
||||
this.imageProcessor = new ImageProcessingService(db, detector);
|
||||
RateLimitedHttpClient httpClient = new RateLimitedHttpClient();
|
||||
this.imageProcessor = new ImageProcessingService(db, detector, httpClient);
|
||||
|
||||
this.monitor = new TroostwijkMonitor(databasePath, notificationConfig,
|
||||
yoloCfg, yoloWeights, yoloClasses);
|
||||
|
||||
@@ -47,5 +47,11 @@ auction.workflow.image-processing.cron=0 0 * * * ?
|
||||
auction.workflow.bid-monitoring.cron=0 */15 * * * ?
|
||||
auction.workflow.closing-alerts.cron=0 */5 * * * ?
|
||||
|
||||
# HTTP Rate Limiting Configuration
|
||||
# Prevents overloading external services and getting blocked
|
||||
auction.http.rate-limit.default-max-rps=2
|
||||
auction.http.rate-limit.troostwijk-max-rps=1
|
||||
auction.http.timeout-seconds=30
|
||||
|
||||
# Health Check Configuration
|
||||
quarkus.smallrye-health.root-path=/health
|
||||
|
||||
@@ -20,13 +20,15 @@ class ImageProcessingServiceTest {
|
||||
|
||||
private DatabaseService mockDb;
|
||||
private ObjectDetectionService mockDetector;
|
||||
private RateLimitedHttpClient mockHttpClient;
|
||||
private ImageProcessingService service;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
mockDb = mock(DatabaseService.class);
|
||||
mockDetector = mock(ObjectDetectionService.class);
|
||||
service = new ImageProcessingService(mockDb, mockDetector);
|
||||
mockHttpClient = mock(RateLimitedHttpClient.class);
|
||||
service = new ImageProcessingService(mockDb, mockDetector, mockHttpClient);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
|
||||
@@ -48,7 +48,8 @@ class IntegrationTest {
|
||||
"non_existent.txt"
|
||||
);
|
||||
|
||||
imageProcessor = new ImageProcessingService(db, detector);
|
||||
RateLimitedHttpClient httpClient = new RateLimitedHttpClient();
|
||||
imageProcessor = new ImageProcessingService(db, detector, httpClient);
|
||||
|
||||
monitor = new TroostwijkMonitor(
|
||||
testDbPath,
|
||||
|
||||
Reference in New Issue
Block a user