"""Lightweight Python API service for signature validation. This can run independently to support the PHP application. """ from __future__ import annotations import asyncio from contextlib import asynccontextmanager import json import logging import os import re import time from typing import Any, TYPE_CHECKING if TYPE_CHECKING: from collections.abc import AsyncIterator, Callable, Awaitable # Avoid importing heavy modules at top-level to keep `import api_service` lightweight from fastapi import FastAPI, Query, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse CACHE_TTL_SECONDS = int(os.getenv("CACHE_TTL", str(72 * 3600))) REDIS_URL = os.getenv("REDIS_URL", "") redis_client = None @asynccontextmanager async def _lifespan(_app: FastAPI) -> AsyncIterator[None]: """Lifespan handler: connect to Redis on startup and close on shutdown.""" global redis_client if REDIS_URL: try: import redis.asyncio as aioredis redis_client = aioredis.from_url(REDIS_URL) try: pong = redis_client.ping() if asyncio.iscoroutine(pong) or asyncio.isfuture(pong): pong = await pong if not pong: logging.exception("redis ping failed") redis_client = None except Exception: logging.exception("redis ping failed") redis_client = None except Exception: logging.exception("failed to create redis client") redis_client = None yield if redis_client is not None: try: await redis_client.close() except Exception: logging.exception("failed to close redis client") app = FastAPI(title="Signature Validation API", lifespan=_lifespan) # Optional path prefix support: when behind a reverse-proxy that uses a # URL prefix (eg. `https://api.example.tld/library/...`) set `API_PREFIX` to # that prefix (example: `/library`) so incoming requests are rewritten to the # application root. This keeps route definitions unchanged while supporting # both proxied and direct deployments. _api_prefix_raw = os.getenv("API_PREFIX", "").strip() api_prefix = "" if _api_prefix_raw: if not _api_prefix_raw.startswith("/"): _api_prefix_raw = "/" + _api_prefix_raw api_prefix = _api_prefix_raw.rstrip("/") @app.middleware("http") async def _strip_api_prefix( request: Request, call_next: Callable[[Request], Awaitable[Response]], ) -> Response: if api_prefix and request.url.path.startswith(api_prefix): new_path = request.url.path[len(api_prefix) :] request.scope["path"] = new_path or "/" request.scope["root_path"] = api_prefix return await call_next(request) # Allow PHP application to call this API app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, restrict to your PHP server domain allow_credentials=True, allow_methods=["GET"], allow_headers=["*"], ) # Catalogue is expensive to initialize at import time; instantiate lazily cat = None def _get_catalogue() -> Any: global cat if cat is None: # import inside function to avoid expensive work during module import from bibapi import catalogue as _catalogue cat = _catalogue.Catalogue() return cat # ---- Caching support ---------------------------------------------- # Uses an async Redis client when `REDIS_URL` is set, otherwise falls # back to a small in-memory store with TTL. Cache TTL defaults to 72h. CacheValue = dict[str, Any] _in_memory_cache: dict[str, tuple[float, CacheValue]] = {} _in_memory_lock = asyncio.Lock() async def _cache_get(key: str) -> CacheValue | None: if redis_client: try: val = await redis_client.get(key) if val is None: return None return json.loads(val) except Exception: logging.exception("redis get failed") return None # fallback in-memory async with _in_memory_lock: entry = _in_memory_cache.get(key) if not entry: return None expires_at, value = entry if time.time() >= expires_at: del _in_memory_cache[key] return None return value async def _cache_set(key: str, value: CacheValue, ttl: int = CACHE_TTL_SECONDS) -> None: if redis_client: try: await redis_client.set(key, json.dumps(value), ex=ttl) return except Exception: logging.exception("redis set failed") async with _in_memory_lock: _in_memory_cache[key] = (time.time() + ttl, value) # Redis lifecycle is handled by the lifespan context manager defined earlier @app.get("/api/validate-signature") async def validate_signature(signature: str = Query(...)) -> JSONResponse: """Validate a book signature and return total pages.""" # check cache first cache_key = f"signature:{signature}" cached = await _cache_get(cache_key) if cached is not None: return JSONResponse(cached) try: book_result = _get_catalogue().get_book_with_data(signature) if book_result and hasattr(book_result, "pages") and book_result.pages: # Try to extract numeric page count pages_str = str(book_result.pages) # Extract first number from pages string (e.g., "245 S." -> 245) match = re.search(r"(\d+)", pages_str) if match: total_pages = int(match.group(1)) result: CacheValue = { "valid": True, "total_pages": total_pages, "signature": signature, } await _cache_set(cache_key, result) return JSONResponse(result) result: CacheValue = { "valid": False, "error": "Signatur nicht gefunden oder keine Seitenzahl verfügbar", "signature": signature, } await _cache_set(cache_key, result) return JSONResponse(result) except Exception as e: logging.exception("validate_signature failure") result: CacheValue = { "valid": False, "error": f"Fehler bei der Validierung: {e!s}", "signature": signature, } # store a failed response in cache as well so we avoid replaying errors await _cache_set(cache_key, result) return JSONResponse(result, status_code=500) @app.get("/health") async def health_check() -> dict[str, str]: """Health check endpoint.""" return {"status": "ok", "service": "signature-validation"} if __name__ == "__main__": import uvicorn port = int(os.getenv("API_PORT", "8001")) uvicorn.run(app, host="0.0.0.0", port=port)