0
This commit is contained in:
85
src/cache.py
85
src/cache.py
@@ -15,6 +15,27 @@ import json
|
||||
from datetime import datetime
|
||||
|
||||
import config
|
||||
from urllib.parse import urlparse
|
||||
|
||||
def _mask_dsn(dsn: str) -> str:
|
||||
try:
|
||||
p = urlparse(dsn)
|
||||
user = p.username or ''
|
||||
host = p.hostname or ''
|
||||
port = f":{p.port}" if p.port else ''
|
||||
return f"{p.scheme}://{user}:***@{host}{port}{p.path or ''}"
|
||||
except Exception:
|
||||
return dsn
|
||||
|
||||
def _describe_target(dsn: str) -> str:
|
||||
try:
|
||||
p = urlparse(dsn)
|
||||
host = p.hostname or 'unknown-host'
|
||||
port = p.port or 5432
|
||||
db = (p.path or '').lstrip('/') or '<no-db>'
|
||||
return f"{host}:{port}/{db}"
|
||||
except Exception:
|
||||
return dsn
|
||||
|
||||
|
||||
class _ConnectionPool:
|
||||
@@ -36,6 +57,12 @@ class _ConnectionPool:
|
||||
self._idle: list = []
|
||||
self._created = 0
|
||||
|
||||
# Basic diagnostics
|
||||
try:
|
||||
print(f"[DB] Initializing connection pool: min={self._min} max={self._max} wait_timeout={self._timeout}s target={_describe_target(self._dsn)}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Pre-warm pool
|
||||
for _ in range(self._min):
|
||||
conn = self._new_connection_with_retry()
|
||||
@@ -45,17 +72,43 @@ class _ConnectionPool:
|
||||
def _new_connection_with_retry(self):
|
||||
last_exc = None
|
||||
backoffs = [0.05, 0.1, 0.2, 0.4, 0.8]
|
||||
target = _describe_target(self._dsn)
|
||||
attempt = 0
|
||||
for delay in backoffs:
|
||||
attempt += 1
|
||||
try:
|
||||
return self._connect(self._dsn)
|
||||
t0 = time.time()
|
||||
conn = self._connect(self._dsn)
|
||||
dt = time.time() - t0
|
||||
try:
|
||||
print(f"[DB] Connected to {target} on attempt {attempt} in {dt:.2f}s")
|
||||
except Exception:
|
||||
pass
|
||||
return conn
|
||||
except Exception as e:
|
||||
last_exc = e
|
||||
try:
|
||||
print(f"[DB] Connect attempt {attempt} to {target} failed: {type(e).__name__}: {e}")
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(delay)
|
||||
# Final attempt without sleeping after loop
|
||||
try:
|
||||
return self._connect(self._dsn)
|
||||
attempt += 1
|
||||
t0 = time.time()
|
||||
conn = self._connect(self._dsn)
|
||||
dt = time.time() - t0
|
||||
try:
|
||||
print(f"[DB] Connected to {target} on attempt {attempt} in {dt:.2f}s")
|
||||
except Exception:
|
||||
pass
|
||||
return conn
|
||||
except Exception as e:
|
||||
last_exc = e
|
||||
try:
|
||||
print(f"[DB] Final connect attempt {attempt} to {target} failed: {type(e).__name__}: {e}")
|
||||
except Exception:
|
||||
pass
|
||||
raise last_exc
|
||||
|
||||
def acquire(self, timeout: Optional[float] = None):
|
||||
@@ -68,11 +121,19 @@ class _ConnectionPool:
|
||||
try:
|
||||
if getattr(conn, "closed", False):
|
||||
self._created -= 1
|
||||
try:
|
||||
print("[DB] Dropping closed connection from pool; adjusting pool size")
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
return conn
|
||||
except Exception:
|
||||
# Consider it broken
|
||||
self._created -= 1
|
||||
try:
|
||||
print("[DB] Encountered error while checking pooled connection; dropping it")
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
# Create new if capacity
|
||||
@@ -84,7 +145,15 @@ class _ConnectionPool:
|
||||
# Wait for release
|
||||
remaining = deadline - time.time()
|
||||
if remaining <= 0:
|
||||
try:
|
||||
print(f"[DB] Timed out after {self._timeout}s waiting for a pooled connection (in use: {self._created}/{self._max})")
|
||||
except Exception:
|
||||
pass
|
||||
raise TimeoutError("Timed out waiting for database connection from pool")
|
||||
try:
|
||||
print(f"[DB] Pool exhausted ({self._created}/{self._max}); waiting up to {remaining:.1f}s for release")
|
||||
except Exception:
|
||||
pass
|
||||
self._cond.wait(remaining)
|
||||
|
||||
def release(self, conn):
|
||||
@@ -96,6 +165,10 @@ class _ConnectionPool:
|
||||
if getattr(conn, "closed", False):
|
||||
with self._cond:
|
||||
self._created -= 1
|
||||
try:
|
||||
print("[DB] Released connection was closed; decrementing pool size")
|
||||
except Exception:
|
||||
pass
|
||||
self._cond.notify()
|
||||
return
|
||||
with self._cond:
|
||||
@@ -127,6 +200,10 @@ class _ConnectionPool:
|
||||
pass
|
||||
self._idle.clear()
|
||||
self._created = 0
|
||||
try:
|
||||
print("[DB] Connection pool closed; all idle connections terminated")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
class CacheManager:
|
||||
"""Manages page caching and data storage using PostgreSQL."""
|
||||
@@ -142,6 +219,10 @@ class CacheManager:
|
||||
max_size=getattr(config, 'DB_POOL_MAX', 6),
|
||||
timeout=getattr(config, 'DB_POOL_TIMEOUT', 30),
|
||||
)
|
||||
try:
|
||||
print(f"[DB] CacheManager initialized with DSN={_mask_dsn(self.database_url)}")
|
||||
except Exception:
|
||||
pass
|
||||
self._init_db()
|
||||
|
||||
# ------------------------
|
||||
|
||||
Reference in New Issue
Block a user