Files
semapform_api/api_service.py

220 lines
7.0 KiB
Python

"""Lightweight Python API service for signature validation.
This can run independently to support the PHP application.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import time
import urllib.parse
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Awaitable, Callable
# Avoid importing heavy modules at top-level to keep `import api_service` lightweight
from fastapi import FastAPI, Query, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
CACHE_TTL_SECONDS = int(os.getenv("CACHE_TTL", str(72 * 3600)))
REDIS_URL = os.getenv("REDIS_URL", "")
redis_client = None
@asynccontextmanager
async def _lifespan(_app: FastAPI) -> AsyncIterator[None]:
"""Lifespan handler: connect to Redis on startup and close on shutdown."""
global redis_client # type: ignore[PLW0603]
if REDIS_URL:
try:
import redis.asyncio as aioredis
redis_client = aioredis.from_url(REDIS_URL)
try:
pong = redis_client.ping()
if asyncio.iscoroutine(pong) or asyncio.isfuture(pong):
pong = await pong
if not pong:
logging.exception("redis ping failed")
redis_client = None
except Exception:
logging.exception("redis ping failed")
redis_client = None
except Exception:
logging.exception("failed to create redis client")
redis_client = None
yield
if redis_client is not None:
try:
await redis_client.close()
except Exception:
logging.exception("failed to close redis client")
app = FastAPI(title="Signature Validation API", lifespan=_lifespan)
# Optional path prefix support: when behind a reverse-proxy that uses a
# URL prefix (eg. `https://api.example.tld/library/...`) set `API_PREFIX` to
# that prefix (example: `/library`) so incoming requests are rewritten to the
# application root. This keeps route definitions unchanged while supporting
# both proxied and direct deployments.
_api_prefix_raw = os.getenv("API_PREFIX", "").strip()
api_prefix = ""
if _api_prefix_raw:
if not _api_prefix_raw.startswith("/"):
_api_prefix_raw = "/" + _api_prefix_raw
api_prefix = _api_prefix_raw.rstrip("/")
@app.middleware("http")
async def _strip_api_prefix(
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
if api_prefix and request.url.path.startswith(api_prefix):
new_path = request.url.path[len(api_prefix) :]
request.scope["path"] = new_path or "/"
request.scope["root_path"] = api_prefix
return await call_next(request)
# Allow PHP application to call this API
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, restrict to your PHP server domain
allow_credentials=True,
allow_methods=["GET"],
allow_headers=["*"],
)
# Catalogue is expensive to initialize at import time; instantiate lazily
cat = None
def _get_catalogue() -> Any:
global cat
if cat is None:
# import inside function to avoid expensive work during module import
from bibapi import catalogue as _catalogue
cat = _catalogue.Catalogue()
return cat
# ---- Caching support ----------------------------------------------
# Uses an async Redis client when `REDIS_URL` is set, otherwise falls
# back to a small in-memory store with TTL. Cache TTL defaults to 72h.
CacheValue = dict[str, Any]
_in_memory_cache: dict[str, tuple[float, CacheValue]] = {}
_in_memory_lock = asyncio.Lock()
async def _cache_get(key: str) -> CacheValue | None:
if redis_client:
try:
val = await redis_client.get(key)
if val is None:
return None
return json.loads(val)
except Exception:
logging.exception("redis get failed")
return None
# fallback in-memory
async with _in_memory_lock:
entry = _in_memory_cache.get(key)
if not entry:
return None
expires_at, value = entry
if time.time() >= expires_at:
del _in_memory_cache[key]
return None
return value
async def _cache_set(key: str, value: CacheValue, ttl: int = CACHE_TTL_SECONDS) -> None:
if redis_client:
try:
await redis_client.set(key, json.dumps(value), ex=ttl)
return
except Exception:
logging.exception("redis set failed")
async with _in_memory_lock:
_in_memory_cache[key] = (time.time() + ttl, value)
# Redis lifecycle is handled by the lifespan context manager defined earlier
@app.get("/api/validate-signature")
async def validate_signature(signature: str = Query(...)) -> JSONResponse:
"""Validate a book signature and return total pages."""
# check cache first
# ensure signature is stripped of leading/trailing whitespace
signature = signature.strip()
# enforce url quotes
signature = urllib.parse.quote(signature)
cache_key = f"signature:{signature}"
cached = await _cache_get(cache_key)
if cached is not None:
return JSONResponse(cached)
try:
book_result = _get_catalogue().get_book_with_data(signature)
if book_result and hasattr(book_result, "pages") and book_result.pages:
# Try to extract numeric page count
pages_str = str(book_result.pages)
# Extract first number from pages string (e.g., "245 S." -> 245)
match = re.search(r"(\d+)", pages_str)
if match:
total_pages = int(match.group(1))
result: CacheValue = {
"valid": True,
"total_pages": total_pages,
"signature": signature,
}
await _cache_set(cache_key, result)
return JSONResponse(result)
result: CacheValue = {
"valid": False,
"error": "Signatur nicht gefunden oder keine Seitenzahl verfügbar",
"signature": signature,
}
await _cache_set(cache_key, result)
return JSONResponse(result)
except Exception as e:
logging.exception("validate_signature failure")
result: CacheValue = {
"valid": False,
"error": f"Fehler bei der Validierung: {e!s}",
"signature": signature,
}
# store a failed response in cache as well so we avoid replaying errors
await _cache_set(cache_key, result)
return JSONResponse(result, status_code=500)
@app.get("/health")
async def health_check() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "ok", "service": "signature-validation"}
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("API_PORT", "8001"))
uvicorn.run(app, host="0.0.0.0", port=port)