Fix mock tests
This commit is contained in:
22
.idea/compiler.xml
generated
22
.idea/compiler.xml
generated
@@ -8,31 +8,11 @@
|
||||
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
|
||||
<outputRelativeToContentRoot value="true" />
|
||||
</profile>
|
||||
<profile name="Annotation profile for Troostwijk Auction Scraper" enabled="true">
|
||||
<sourceOutputDir name="target/generated-sources/annotations" />
|
||||
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
|
||||
<outputRelativeToContentRoot value="true" />
|
||||
<processorPath useClasspath="false">
|
||||
<entry name="$MAVEN_REPOSITORY$/org/projectlombok/lombok/1.18.40/lombok-1.18.40.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/io/quarkus/quarkus-extension-processor/3.17.7/quarkus-extension-processor-3.17.7.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/org/jboss/jdeparser/jdeparser/2.0.3.Final/jdeparser-2.0.3.Final.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/org/jsoup/jsoup/1.15.3/jsoup-1.15.3.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/com/github/javaparser/javaparser-core/3.26.2/javaparser-core-3.26.2.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.18.2/jackson-databind-2.18.2.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-annotations/2.18.2/jackson-annotations-2.18.2.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.18.2/jackson-core-2.18.2.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/com/fasterxml/jackson/dataformat/jackson-dataformat-yaml/2.18.2/jackson-dataformat-yaml-2.18.2.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/org/yaml/snakeyaml/2.3/snakeyaml-2.3.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/com/fasterxml/jackson/module/jackson-module-parameter-names/2.18.2/jackson-module-parameter-names-2.18.2.jar" />
|
||||
<entry name="$MAVEN_REPOSITORY$/io/quarkus/quarkus-bootstrap-app-model/3.17.7/quarkus-bootstrap-app-model-3.17.7.jar" />
|
||||
</processorPath>
|
||||
<module name="auctiora" />
|
||||
</profile>
|
||||
</annotationProcessing>
|
||||
</component>
|
||||
<component name="JavacSettings">
|
||||
<option name="ADDITIONAL_OPTIONS_OVERRIDE">
|
||||
<module name="auctiora" options="-Xdiags:verbose -Xlint:all -parameters" />
|
||||
<module name="auctiora" options="-Xdiags:verbose -Xlint:all -proc:none" />
|
||||
</option>
|
||||
</component>
|
||||
</project>
|
||||
13
pom.xml
13
pom.xml
@@ -83,6 +83,17 @@
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<!-- Force consistent versions -->
|
||||
<dependency>
|
||||
<groupId>org.opentest4j</groupId>
|
||||
<artifactId>opentest4j</artifactId>
|
||||
<version>1.3.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>2.0.9</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
@@ -305,7 +316,7 @@
|
||||
<compilerArgs>
|
||||
<arg>-Xdiags:verbose</arg>
|
||||
<arg>-Xlint:all</arg>
|
||||
<arg>-parameters</arg>
|
||||
<arg>-proc:none</arg>
|
||||
</compilerArgs>
|
||||
<fork>true</fork>
|
||||
<excludes>
|
||||
|
||||
@@ -6,6 +6,7 @@ import org.eclipse.microprofile.health.HealthCheck;
|
||||
import org.eclipse.microprofile.health.HealthCheckResponse;
|
||||
import org.eclipse.microprofile.health.Liveness;
|
||||
import org.eclipse.microprofile.health.Readiness;
|
||||
import org.eclipse.microprofile.health.Startup;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
@@ -73,7 +74,7 @@ public class AuctionMonitorHealthCheck {
|
||||
* Startup probe - checks if application has started correctly
|
||||
* GET /health/started
|
||||
*/
|
||||
@org.eclipse.microprofile.health.Startup
|
||||
@Startup
|
||||
@ApplicationScoped
|
||||
public static class StartupCheck implements HealthCheck {
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ public class AuctionMonitorProducer {
|
||||
public ImageProcessingService produceImageProcessingService(
|
||||
DatabaseService db,
|
||||
ObjectDetectionService detector,
|
||||
RateLimitedHttpClient2 httpClient) {
|
||||
RateLimitedHttpClient httpClient) {
|
||||
|
||||
LOG.infof("Initializing ImageProcessingService");
|
||||
return new ImageProcessingService(db, detector, httpClient);
|
||||
|
||||
@@ -33,7 +33,7 @@ public class AuctionMonitorResource {
|
||||
NotificationService notifier;
|
||||
|
||||
@Inject
|
||||
RateLimitedHttpClient2 httpClient;
|
||||
RateLimitedHttpClient httpClient;
|
||||
|
||||
/**
|
||||
* GET /api/monitor/status
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package auctiora;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import java.io.Console;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
@@ -18,11 +17,11 @@ import java.util.List;
|
||||
@Slf4j
|
||||
class ImageProcessingService {
|
||||
|
||||
private final RateLimitedHttpClient2 httpClient;
|
||||
private final DatabaseService db;
|
||||
private final RateLimitedHttpClient httpClient;
|
||||
private final DatabaseService db;
|
||||
private final ObjectDetectionService detector;
|
||||
|
||||
ImageProcessingService(DatabaseService db, ObjectDetectionService detector, RateLimitedHttpClient2 httpClient) {
|
||||
ImageProcessingService(DatabaseService db, ObjectDetectionService detector, RateLimitedHttpClient httpClient) {
|
||||
this.httpClient = httpClient;
|
||||
this.db = db;
|
||||
this.detector = detector;
|
||||
@@ -64,6 +63,8 @@ class ImageProcessingService {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -23,6 +23,12 @@ import org.opencv.core.Core;
|
||||
@Slf4j
|
||||
public class Main {
|
||||
|
||||
@SuppressWarnings("restricted")
|
||||
private static Object loadOpenCV() {
|
||||
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
|
||||
return null;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
log.info("=== Troostwijk Auction Monitor ===\n");
|
||||
|
||||
@@ -40,7 +46,7 @@ public class Main {
|
||||
|
||||
// Load native OpenCV library (only if models exist)
|
||||
try {
|
||||
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
|
||||
loadOpenCV();
|
||||
log.info("✓ OpenCV loaded");
|
||||
} catch (UnsatisfiedLinkError e) {
|
||||
log.info("⚠️ OpenCV not available - image detection disabled");
|
||||
|
||||
@@ -2,7 +2,9 @@ package auctiora;
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
@@ -10,66 +12,259 @@ import java.net.http.HttpResponse;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import io.github.bucket4j.*;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Rate-limited HTTP client that enforces per-host request limits.
|
||||
*
|
||||
* Features:
|
||||
* - Per-host rate limiting (configurable max requests per second)
|
||||
* - Request counting and monitoring
|
||||
* - Thread-safe using semaphores
|
||||
* - Automatic host extraction from URLs
|
||||
*
|
||||
* This prevents overloading external services like Troostwijk and getting blocked.
|
||||
*/
|
||||
@ApplicationScoped
|
||||
public class RateLimitedHttpClient {
|
||||
|
||||
private final HttpClient client = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(30))
|
||||
.build();
|
||||
private static final Logger LOG = Logger.getLogger(RateLimitedHttpClient.class);
|
||||
|
||||
private final HttpClient httpClient;
|
||||
private final Map<String, RateLimiter> rateLimiters;
|
||||
private final Map<String, RequestStats> requestStats;
|
||||
|
||||
@ConfigProperty(name = "auction.http.rate-limit.default-max-rps", defaultValue = "2")
|
||||
int defaultRps;
|
||||
int defaultMaxRequestsPerSecond;
|
||||
|
||||
@ConfigProperty(name = "auction.http.rate-limit.troostwijk-max-rps", defaultValue = "1")
|
||||
int troostwijkRps;
|
||||
int troostwijkMaxRequestsPerSecond;
|
||||
|
||||
@ConfigProperty(name = "auction.http.timeout-seconds", defaultValue = "30")
|
||||
int timeoutSeconds;
|
||||
|
||||
private final Map<String, Bucket> buckets = new ConcurrentHashMap<>();
|
||||
public RateLimitedHttpClient() {
|
||||
this.httpClient = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(30))
|
||||
.build();
|
||||
this.rateLimiters = new ConcurrentHashMap<>();
|
||||
this.requestStats = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
private Bucket bucketForHost(String host) {
|
||||
return buckets.computeIfAbsent(host, h -> {
|
||||
int rps = host.contains("troostwijk") ? troostwijkRps : defaultRps;
|
||||
var limit = Bandwidth.simple(rps, Duration.ofSeconds(1));
|
||||
return Bucket4j.builder().addLimit(limit).build();
|
||||
/**
|
||||
* Sends a GET request with automatic rate limiting based on host.
|
||||
*/
|
||||
public HttpResponse<String> sendGet(String url) throws IOException, InterruptedException {
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(url))
|
||||
.timeout(Duration.ofSeconds(timeoutSeconds))
|
||||
.GET()
|
||||
.build();
|
||||
|
||||
return send(request, HttpResponse.BodyHandlers.ofString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a request for binary data (like images) with rate limiting.
|
||||
*/
|
||||
public HttpResponse<byte[]> sendGetBytes(String url) throws IOException, InterruptedException {
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(url))
|
||||
.timeout(Duration.ofSeconds(timeoutSeconds))
|
||||
.GET()
|
||||
.build();
|
||||
|
||||
return send(request, HttpResponse.BodyHandlers.ofByteArray());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends any HTTP request with automatic rate limiting.
|
||||
*/
|
||||
public <T> HttpResponse<T> send(HttpRequest request, HttpResponse.BodyHandler<T> bodyHandler)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
String host = extractHost(request.uri());
|
||||
RateLimiter limiter = getRateLimiter(host);
|
||||
RequestStats stats = getRequestStats(host);
|
||||
|
||||
// Enforce rate limit (blocks if necessary)
|
||||
limiter.acquire();
|
||||
|
||||
// Track request
|
||||
stats.incrementTotal();
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
HttpResponse<T> response = httpClient.send(request, bodyHandler);
|
||||
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
stats.recordSuccess(duration);
|
||||
|
||||
LOG.debugf("HTTP %d %s %s (%dms)",
|
||||
response.statusCode(), request.method(), host, duration);
|
||||
|
||||
// Track rate limit violations (429 = Too Many Requests)
|
||||
if (response.statusCode() == 429) {
|
||||
stats.incrementRateLimited();
|
||||
LOG.warnf("⚠️ Rate limited by %s (HTTP 429)", host);
|
||||
}
|
||||
|
||||
return response;
|
||||
|
||||
} catch (IOException | InterruptedException e) {
|
||||
stats.incrementFailed();
|
||||
LOG.warnf("❌ HTTP request failed for %s: %s", host, e.getMessage());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or creates a rate limiter for a specific host.
|
||||
*/
|
||||
private RateLimiter getRateLimiter(String host) {
|
||||
return rateLimiters.computeIfAbsent(host, h -> {
|
||||
int maxRps = getMaxRequestsPerSecond(h);
|
||||
LOG.infof("Initializing rate limiter for %s: %d req/s", h, maxRps);
|
||||
return new RateLimiter(maxRps);
|
||||
});
|
||||
}
|
||||
|
||||
public HttpResponse<String> sendGet(String url) throws Exception {
|
||||
var req = HttpRequest.newBuilder()
|
||||
.uri(URI.create(url))
|
||||
.timeout(Duration.ofSeconds(timeoutSeconds))
|
||||
.GET()
|
||||
.build();
|
||||
return send(req, HttpResponse.BodyHandlers.ofString());
|
||||
/**
|
||||
* Gets or creates request stats for a specific host.
|
||||
*/
|
||||
private RequestStats getRequestStats(String host) {
|
||||
return requestStats.computeIfAbsent(host, h -> new RequestStats(h));
|
||||
}
|
||||
|
||||
public HttpResponse<byte[]> sendGetBytes(String url) throws Exception {
|
||||
var req = HttpRequest.newBuilder()
|
||||
.uri(URI.create(url))
|
||||
.timeout(Duration.ofSeconds(timeoutSeconds))
|
||||
.GET()
|
||||
.build();
|
||||
return send(req, HttpResponse.BodyHandlers.ofByteArray());
|
||||
/**
|
||||
* Determines max requests per second for a given host.
|
||||
*/
|
||||
private int getMaxRequestsPerSecond(String host) {
|
||||
if (host.contains("troostwijk")) {
|
||||
return troostwijkMaxRequestsPerSecond;
|
||||
}
|
||||
return defaultMaxRequestsPerSecond;
|
||||
}
|
||||
|
||||
public <T> HttpResponse<T> send(HttpRequest req,
|
||||
HttpResponse.BodyHandler<T> handler) throws Exception {
|
||||
String host = req.uri().getHost();
|
||||
var bucket = bucketForHost(host);
|
||||
bucket.asBlocking().consume(1);
|
||||
/**
|
||||
* Extracts host from URI (e.g., "api.troostwijkauctions.com").
|
||||
*/
|
||||
private String extractHost(URI uri) {
|
||||
return uri.getHost() != null ? uri.getHost() : uri.toString();
|
||||
}
|
||||
|
||||
var start = System.currentTimeMillis();
|
||||
var resp = client.send(req, handler);
|
||||
var duration = System.currentTimeMillis() - start;
|
||||
/**
|
||||
* Gets statistics for all hosts.
|
||||
*/
|
||||
public Map<String, RequestStats> getAllStats() {
|
||||
return Map.copyOf(requestStats);
|
||||
}
|
||||
|
||||
// (Optional) Logging
|
||||
System.out.printf("HTTP %d %s %s in %d ms%n",
|
||||
resp.statusCode(), req.method(), host, duration);
|
||||
/**
|
||||
* Gets statistics for a specific host.
|
||||
*/
|
||||
public RequestStats getStats(String host) {
|
||||
return requestStats.get(host);
|
||||
}
|
||||
|
||||
return resp;
|
||||
/**
|
||||
* Rate limiter implementation using token bucket algorithm.
|
||||
* Allows burst traffic up to maxRequestsPerSecond, then enforces steady rate.
|
||||
*/
|
||||
private static class RateLimiter {
|
||||
private final Semaphore semaphore;
|
||||
private final int maxRequestsPerSecond;
|
||||
private final long intervalNanos;
|
||||
|
||||
RateLimiter(int maxRequestsPerSecond) {
|
||||
this.maxRequestsPerSecond = maxRequestsPerSecond;
|
||||
this.intervalNanos = TimeUnit.SECONDS.toNanos(1) / maxRequestsPerSecond;
|
||||
this.semaphore = new Semaphore(maxRequestsPerSecond);
|
||||
|
||||
// Refill tokens periodically
|
||||
startRefillThread();
|
||||
}
|
||||
|
||||
void acquire() throws InterruptedException {
|
||||
semaphore.acquire();
|
||||
|
||||
// Enforce minimum delay between requests
|
||||
long delayMillis = intervalNanos / 1_000_000;
|
||||
if (delayMillis > 0) {
|
||||
Thread.sleep(delayMillis);
|
||||
}
|
||||
}
|
||||
|
||||
private void startRefillThread() {
|
||||
Thread refillThread = new Thread(() -> {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
Thread.sleep(1000); // Refill every second
|
||||
int toRelease = maxRequestsPerSecond - semaphore.availablePermits();
|
||||
if (toRelease > 0) {
|
||||
semaphore.release(toRelease);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}, "RateLimiter-Refill");
|
||||
refillThread.setDaemon(true);
|
||||
refillThread.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Statistics tracker for HTTP requests per host.
|
||||
*/
|
||||
public static class RequestStats {
|
||||
private final String host;
|
||||
private final AtomicLong totalRequests = new AtomicLong(0);
|
||||
private final AtomicLong successfulRequests = new AtomicLong(0);
|
||||
private final AtomicLong failedRequests = new AtomicLong(0);
|
||||
private final AtomicLong rateLimitedRequests = new AtomicLong(0);
|
||||
private final AtomicLong totalDurationMs = new AtomicLong(0);
|
||||
|
||||
RequestStats(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
void incrementTotal() {
|
||||
totalRequests.incrementAndGet();
|
||||
}
|
||||
|
||||
void recordSuccess(long durationMs) {
|
||||
successfulRequests.incrementAndGet();
|
||||
totalDurationMs.addAndGet(durationMs);
|
||||
}
|
||||
|
||||
void incrementFailed() {
|
||||
failedRequests.incrementAndGet();
|
||||
}
|
||||
|
||||
void incrementRateLimited() {
|
||||
rateLimitedRequests.incrementAndGet();
|
||||
}
|
||||
|
||||
// Getters
|
||||
public String getHost() { return host; }
|
||||
public long getTotalRequests() { return totalRequests.get(); }
|
||||
public long getSuccessfulRequests() { return successfulRequests.get(); }
|
||||
public long getFailedRequests() { return failedRequests.get(); }
|
||||
public long getRateLimitedRequests() { return rateLimitedRequests.get(); }
|
||||
public long getAverageDurationMs() {
|
||||
long successful = successfulRequests.get();
|
||||
return successful > 0 ? totalDurationMs.get() / successful : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s: %d total, %d success, %d failed, %d rate-limited, avg %dms",
|
||||
host, getTotalRequests(), getSuccessfulRequests(),
|
||||
getFailedRequests(), getRateLimitedRequests(), getAverageDurationMs());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,270 +0,0 @@
|
||||
package auctiora;
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Rate-limited HTTP client that enforces per-host request limits.
|
||||
*
|
||||
* Features:
|
||||
* - Per-host rate limiting (configurable max requests per second)
|
||||
* - Request counting and monitoring
|
||||
* - Thread-safe using semaphores
|
||||
* - Automatic host extraction from URLs
|
||||
*
|
||||
* This prevents overloading external services like Troostwijk and getting blocked.
|
||||
*/
|
||||
@ApplicationScoped
|
||||
public class RateLimitedHttpClient2 {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(RateLimitedHttpClient2.class);
|
||||
|
||||
private final HttpClient httpClient;
|
||||
private final Map<String, RateLimiter> rateLimiters;
|
||||
private final Map<String, RequestStats> requestStats;
|
||||
|
||||
@ConfigProperty(name = "auction.http.rate-limit.default-max-rps", defaultValue = "2")
|
||||
int defaultMaxRequestsPerSecond;
|
||||
|
||||
@ConfigProperty(name = "auction.http.rate-limit.troostwijk-max-rps", defaultValue = "1")
|
||||
int troostwijkMaxRequestsPerSecond;
|
||||
|
||||
@ConfigProperty(name = "auction.http.timeout-seconds", defaultValue = "30")
|
||||
int timeoutSeconds;
|
||||
|
||||
public RateLimitedHttpClient2() {
|
||||
this.httpClient = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(30))
|
||||
.build();
|
||||
this.rateLimiters = new ConcurrentHashMap<>();
|
||||
this.requestStats = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a GET request with automatic rate limiting based on host.
|
||||
*/
|
||||
public HttpResponse<String> sendGet(String url) throws IOException, InterruptedException {
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(url))
|
||||
.timeout(Duration.ofSeconds(timeoutSeconds))
|
||||
.GET()
|
||||
.build();
|
||||
|
||||
return send(request, HttpResponse.BodyHandlers.ofString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a request for binary data (like images) with rate limiting.
|
||||
*/
|
||||
public HttpResponse<byte[]> sendGetBytes(String url) throws IOException, InterruptedException {
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(url))
|
||||
.timeout(Duration.ofSeconds(timeoutSeconds))
|
||||
.GET()
|
||||
.build();
|
||||
|
||||
return send(request, HttpResponse.BodyHandlers.ofByteArray());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends any HTTP request with automatic rate limiting.
|
||||
*/
|
||||
public <T> HttpResponse<T> send(HttpRequest request, HttpResponse.BodyHandler<T> bodyHandler)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
String host = extractHost(request.uri());
|
||||
RateLimiter limiter = getRateLimiter(host);
|
||||
RequestStats stats = getRequestStats(host);
|
||||
|
||||
// Enforce rate limit (blocks if necessary)
|
||||
limiter.acquire();
|
||||
|
||||
// Track request
|
||||
stats.incrementTotal();
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
HttpResponse<T> response = httpClient.send(request, bodyHandler);
|
||||
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
stats.recordSuccess(duration);
|
||||
|
||||
LOG.debugf("HTTP %d %s %s (%dms)",
|
||||
response.statusCode(), request.method(), host, duration);
|
||||
|
||||
// Track rate limit violations (429 = Too Many Requests)
|
||||
if (response.statusCode() == 429) {
|
||||
stats.incrementRateLimited();
|
||||
LOG.warnf("⚠️ Rate limited by %s (HTTP 429)", host);
|
||||
}
|
||||
|
||||
return response;
|
||||
|
||||
} catch (IOException | InterruptedException e) {
|
||||
stats.incrementFailed();
|
||||
LOG.warnf("❌ HTTP request failed for %s: %s", host, e.getMessage());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or creates a rate limiter for a specific host.
|
||||
*/
|
||||
private RateLimiter getRateLimiter(String host) {
|
||||
return rateLimiters.computeIfAbsent(host, h -> {
|
||||
int maxRps = getMaxRequestsPerSecond(h);
|
||||
LOG.infof("Initializing rate limiter for %s: %d req/s", h, maxRps);
|
||||
return new RateLimiter(maxRps);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or creates request stats for a specific host.
|
||||
*/
|
||||
private RequestStats getRequestStats(String host) {
|
||||
return requestStats.computeIfAbsent(host, h -> new RequestStats(h));
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines max requests per second for a given host.
|
||||
*/
|
||||
private int getMaxRequestsPerSecond(String host) {
|
||||
if (host.contains("troostwijk")) {
|
||||
return troostwijkMaxRequestsPerSecond;
|
||||
}
|
||||
return defaultMaxRequestsPerSecond;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts host from URI (e.g., "api.troostwijkauctions.com").
|
||||
*/
|
||||
private String extractHost(URI uri) {
|
||||
return uri.getHost() != null ? uri.getHost() : uri.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets statistics for all hosts.
|
||||
*/
|
||||
public Map<String, RequestStats> getAllStats() {
|
||||
return Map.copyOf(requestStats);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets statistics for a specific host.
|
||||
*/
|
||||
public RequestStats getStats(String host) {
|
||||
return requestStats.get(host);
|
||||
}
|
||||
|
||||
/**
|
||||
* Rate limiter implementation using token bucket algorithm.
|
||||
* Allows burst traffic up to maxRequestsPerSecond, then enforces steady rate.
|
||||
*/
|
||||
private static class RateLimiter {
|
||||
private final Semaphore semaphore;
|
||||
private final int maxRequestsPerSecond;
|
||||
private final long intervalNanos;
|
||||
|
||||
RateLimiter(int maxRequestsPerSecond) {
|
||||
this.maxRequestsPerSecond = maxRequestsPerSecond;
|
||||
this.intervalNanos = TimeUnit.SECONDS.toNanos(1) / maxRequestsPerSecond;
|
||||
this.semaphore = new Semaphore(maxRequestsPerSecond);
|
||||
|
||||
// Refill tokens periodically
|
||||
startRefillThread();
|
||||
}
|
||||
|
||||
void acquire() throws InterruptedException {
|
||||
semaphore.acquire();
|
||||
|
||||
// Enforce minimum delay between requests
|
||||
long delayMillis = intervalNanos / 1_000_000;
|
||||
if (delayMillis > 0) {
|
||||
Thread.sleep(delayMillis);
|
||||
}
|
||||
}
|
||||
|
||||
private void startRefillThread() {
|
||||
Thread refillThread = new Thread(() -> {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
Thread.sleep(1000); // Refill every second
|
||||
int toRelease = maxRequestsPerSecond - semaphore.availablePermits();
|
||||
if (toRelease > 0) {
|
||||
semaphore.release(toRelease);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}, "RateLimiter-Refill");
|
||||
refillThread.setDaemon(true);
|
||||
refillThread.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Statistics tracker for HTTP requests per host.
|
||||
*/
|
||||
public static class RequestStats {
|
||||
private final String host;
|
||||
private final AtomicLong totalRequests = new AtomicLong(0);
|
||||
private final AtomicLong successfulRequests = new AtomicLong(0);
|
||||
private final AtomicLong failedRequests = new AtomicLong(0);
|
||||
private final AtomicLong rateLimitedRequests = new AtomicLong(0);
|
||||
private final AtomicLong totalDurationMs = new AtomicLong(0);
|
||||
|
||||
RequestStats(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
void incrementTotal() {
|
||||
totalRequests.incrementAndGet();
|
||||
}
|
||||
|
||||
void recordSuccess(long durationMs) {
|
||||
successfulRequests.incrementAndGet();
|
||||
totalDurationMs.addAndGet(durationMs);
|
||||
}
|
||||
|
||||
void incrementFailed() {
|
||||
failedRequests.incrementAndGet();
|
||||
}
|
||||
|
||||
void incrementRateLimited() {
|
||||
rateLimitedRequests.incrementAndGet();
|
||||
}
|
||||
|
||||
// Getters
|
||||
public String getHost() { return host; }
|
||||
public long getTotalRequests() { return totalRequests.get(); }
|
||||
public long getSuccessfulRequests() { return successfulRequests.get(); }
|
||||
public long getFailedRequests() { return failedRequests.get(); }
|
||||
public long getRateLimitedRequests() { return rateLimitedRequests.get(); }
|
||||
public long getAverageDurationMs() {
|
||||
long successful = successfulRequests.get();
|
||||
return successful > 0 ? totalDurationMs.get() / successful : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s: %d total, %d success, %d failed, %d rate-limited, avg %dms",
|
||||
host, getTotalRequests(), getSuccessfulRequests(),
|
||||
getFailedRequests(), getRateLimitedRequests(), getAverageDurationMs());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -18,8 +18,8 @@ public class TroostwijkMonitor {
|
||||
|
||||
private static final String LOT_API = "https://api.troostwijkauctions.com/lot/7/list";
|
||||
|
||||
RateLimitedHttpClient2 httpClient;
|
||||
ObjectMapper objectMapper;
|
||||
RateLimitedHttpClient httpClient;
|
||||
ObjectMapper objectMapper;
|
||||
@Getter DatabaseService db;
|
||||
NotificationService notifier;
|
||||
ObjectDetectionService detector;
|
||||
@@ -38,7 +38,7 @@ public class TroostwijkMonitor {
|
||||
String classNamesPath)
|
||||
throws SQLException, IOException {
|
||||
|
||||
httpClient = new RateLimitedHttpClient2();
|
||||
httpClient = new RateLimitedHttpClient();
|
||||
objectMapper = new ObjectMapper();
|
||||
db = new DatabaseService(databasePath);
|
||||
notifier = new NotificationService(notificationConfig);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package auctiora;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import java.io.Console;
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
@@ -43,7 +42,7 @@ public class WorkflowOrchestrator {
|
||||
|
||||
this.notifier = new NotificationService(notificationConfig);
|
||||
this.detector = new ObjectDetectionService(yoloCfg, yoloWeights, yoloClasses);
|
||||
RateLimitedHttpClient2 httpClient = new RateLimitedHttpClient2();
|
||||
RateLimitedHttpClient httpClient = new RateLimitedHttpClient();
|
||||
this.imageProcessor = new ImageProcessingService(db, detector, httpClient);
|
||||
|
||||
this.monitor = new TroostwijkMonitor(databasePath, notificationConfig,
|
||||
|
||||
@@ -31,6 +31,10 @@ quarkus.log.console.level=INFO
|
||||
%dev.quarkus.log.console.level=DEBUG
|
||||
%dev.quarkus.live-reload.instrumentation=true
|
||||
|
||||
# JVM Arguments for native access (Jansi, OpenCV, etc.)
|
||||
quarkus.native.additional-build-args=--enable-native-access=ALL-UNNAMED
|
||||
quarkus.jvm.args=--enable-native-access=ALL-UNNAMED
|
||||
|
||||
# Production optimizations
|
||||
%prod.quarkus.package.type=fast-jar
|
||||
%prod.quarkus.http.enable-compression=true
|
||||
|
||||
@@ -19,14 +19,14 @@ class ImageProcessingServiceTest {
|
||||
|
||||
private DatabaseService mockDb;
|
||||
private ObjectDetectionService mockDetector;
|
||||
private RateLimitedHttpClient2 mockHttpClient;
|
||||
private RateLimitedHttpClient mockHttpClient;
|
||||
private ImageProcessingService service;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
mockDb = mock(DatabaseService.class);
|
||||
mockDetector = mock(ObjectDetectionService.class);
|
||||
mockHttpClient = mock(RateLimitedHttpClient2.class);
|
||||
mockHttpClient = mock(RateLimitedHttpClient.class);
|
||||
service = new ImageProcessingService(mockDb, mockDetector, mockHttpClient);
|
||||
}
|
||||
|
||||
@@ -102,6 +102,7 @@ class ImageProcessingServiceTest {
|
||||
ArgumentCaptor<Integer> lotIdCaptor = ArgumentCaptor.forClass(Integer.class);
|
||||
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
|
||||
ArgumentCaptor<String> filePathCaptor = ArgumentCaptor.forClass(String.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
ArgumentCaptor<List<String>> labelsCaptor = ArgumentCaptor.forClass(List.class);
|
||||
|
||||
when(mockDetector.detectObjects(anyString()))
|
||||
@@ -169,8 +170,8 @@ class ImageProcessingServiceTest {
|
||||
@DisplayName("Should handle database errors during image save")
|
||||
void testDatabaseErrorHandling() throws Exception {
|
||||
// Mock successful HTTP download
|
||||
@SuppressWarnings("unchecked")
|
||||
var mockResponse = mock(java.net.http.HttpResponse.class);
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
var mockResponse = (java.net.http.HttpResponse<byte[]>) mock(java.net.http.HttpResponse.class);
|
||||
when(mockResponse.statusCode()).thenReturn(200);
|
||||
when(mockResponse.body()).thenReturn(new byte[]{1, 2, 3});
|
||||
when(mockHttpClient.sendGetBytes(anyString())).thenReturn(mockResponse);
|
||||
|
||||
@@ -48,7 +48,7 @@ class IntegrationTest {
|
||||
"non_existent.txt"
|
||||
);
|
||||
|
||||
RateLimitedHttpClient2 httpClient = new RateLimitedHttpClient2();
|
||||
RateLimitedHttpClient httpClient = new RateLimitedHttpClient();
|
||||
imageProcessor = new ImageProcessingService(db, detector, httpClient);
|
||||
|
||||
monitor = new TroostwijkMonitor(
|
||||
|
||||
Reference in New Issue
Block a user