- add retries, timeouts
- add max time of 30 seconds
- add library identifier parsing
add more keys to picaschema
add keys to alma, dc shema
- update querytransformer to format author, title based on API requirements
- update Api class to drop not supported arguments before the query is created
- add empty returns for results which raised timeout exceptions
chore:
- remove unneeded ArgumentSchema enum
This commit is contained in:
2025-11-11 14:00:42 +01:00
parent ce3eea7243
commit 4d8c5da4f5

View File

@@ -1,4 +1,5 @@
import re
import time
import xml.etree.ElementTree as ET
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
@@ -77,6 +78,7 @@ def parse_record(zs_record_el: ET.Element) -> Record:
if marc_record_el is None:
# If the MARC record uses default ns (xmlns="...") ElementTree still needs the ns-qualified name
# We already searched with prefix; this covers both default and prefixed cases.
return None
raise ValueError("Missing MARC21 record inside zs:recordData")
marc_record = parse_marc_record(marc_record_el)
@@ -129,7 +131,9 @@ def parse_search_retrieve_response(
records: List[Record] = []
if records_parent is not None:
for r in records_parent.findall("zs:record", NS):
records.append(parse_record(r))
record = parse_record(r)
if record is not None:
records.append(record)
echoed = parse_echoed_request(root)
@@ -323,13 +327,15 @@ def subfield_value_pairs_from_fields(
return out
def book_from_marc(rec: MarcRecord) -> BookData:
def book_from_marc(rec: MarcRecord, library_identifier: str) -> BookData:
library_ident_tag = library_identifier.split("$")[0]
library_ident_code = library_identifier.split("$")[1]
# PPN from controlfield 001
ppn = controlfield_value(rec, "001")
# Title = 245 $a + 245 $b (if present)
t_a = first_subfield_value(rec, "245", "a")
t_b = first_subfield_value(rec, "245", "b")
t_b = "" # first_subfield_value(rec, "245", "b")
title = _smart_join_title(t_a, t_b) if t_a else None
# Signature = 924 where $9 == "Frei 129" → take that field's $g
@@ -349,6 +355,7 @@ def book_from_marc(rec: MarcRecord) -> BookData:
author = None
if authors:
author = "; ".join(authors)
libraries = subfield_values(rec, library_ident_tag, library_ident_code)
return BookData(
ppn=ppn,
@@ -363,6 +370,7 @@ def book_from_marc(rec: MarcRecord) -> BookData:
link="",
author=author,
media_type=mediatype,
libraries=libraries,
)
@@ -377,14 +385,25 @@ class PicaSchema(Enum):
ISMN = "pica.ism"
PPN = "pica.ppn"
AUTHOR = "pica.per"
YEAR = "pica.jhr"
AUTHOR_SCHEMA = "NoSpaceAfterComma"
ENCLOSE_TITLE_IN_QUOTES = False
class ALMASchema(Enum):
pass
TITLE = "alma.title"
AUTHOR = "alma.author"
ENCLOSE_TITLE_IN_QUOTES = True
AUTHOR_SCHEMA = "NoSpaceAfterComma"
YEAR = "date_of_publication"
class DublinCoreSchema(Enum):
pass
TITLE = "dc.title"
AUTHOR = "dc.creator"
AUTHOR_SCHEMA = "SpaceAfterComma"
ENCLOSE_TITLE_IN_QUOTES = False
YEAR = "dc.date"
class CQLSchema(Enum):
@@ -395,6 +414,7 @@ class SWBSchema(Enum):
URL = "https://sru.k10plus.de/opac-de-627!rec=1?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=marcxml"
ARGSCHEMA = PicaSchema
NAME = "SWB"
LIBRARY_NAME_LOCATION_FIELD = "924$b"
class DNBSchema(Enum):
@@ -407,32 +427,30 @@ class KOBVSchema(Enum):
URL = "https://sru.kobv.de/k2?version=1.1&operation=searchRetrieve&query={}&startRecord=1&maximumRecords=100&recordSchema=marcxml"
ARGSCHEMA = DublinCoreSchema
NAME = "KOBV"
LIBRARY_NAME_LOCATION_FIELD = "924$b"
class HebisSchema(Enum):
URL = "http://sru.hebis.de/sru/DB=2.1?query={}&version=1.1&operation=searchRetrieve&stylesheet=http%3A%2F%2Fsru.hebis.de%2Fsru%2F%3Fxsl%3DsearchRetrieveResponse&recordSchema=marc21&maximumRecords=100&startRecord=1&recordPacking=xml&sortKeys=LST_Y%2Cpica%2C0%2C%2C"
ARGSCHEMA = PicaSchema
NOTSUPPORTEDARGS = ["YEAR"]
NAME = "HEBIS"
REPLACE = {" ": "+", "&": "%26", "=": "+%3D+"}
LIBRARY_NAME_LOCATION_FIELD = "924$b"
class OEVKSchema(Enum):
URL = "https://sru.k10plus.de/opac-de-627-2?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=marcxml"
ARGSCHEMA = PicaSchema
NAME = "OEVK"
LIBRARY_NAME_LOCATION_FIELD = "924$b"
class HBZSchema(Enum):
URL = "https://eu04.alma.exlibrisgroup.com/view/sru/49HBZ_NETWORK?version=1.2&operation=searchRetrieve&recordSchema=marcxml&query={}&maximumRecords=100&recordSchema=marcxml"
ARGSCHEMA = ALMASchema
NAME = "HBZ"
class ArgumentSchema(Enum):
TITLE = (
"title",
"tit",
)
LIBRARY_NAME_LOCATION_FIELD = "852$a"
RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK notations
@@ -538,25 +556,38 @@ def find_newer_edition(
class QueryTransformer:
def __init__(
self, api_schema: Type[PicaSchema], arguments: Union[Iterable[str], str]
):
def __init__(self, api_schema: Type[Enum], arguments: Union[Iterable[str], str]):
self.api_schema = api_schema
if isinstance(arguments, str):
self.arguments = [arguments]
else:
self.arguments = arguments
self.drop_empty = True
def transform(self) -> Dict[str, Any]:
arguments: List[str] = []
schema = self.api_schema
print(schema.TITLE.name)
for arg in self.arguments:
if "=" not in arg:
continue
if self.drop_empty and arg.endswith("="):
continue
key, value = arg.split("=", 1)
if hasattr(schema, key.upper()):
api_key = getattr(schema, key.upper()).value
if key.upper() == "AUTHOR" and hasattr(schema, "AUTHOR_SCHEMA"):
author_schema = getattr(schema, "AUTHOR_SCHEMA").value
if author_schema == "SpaceAfterComma":
value = value.replace(",", ", ")
elif author_schema == "NoSpaceAfterComma":
value = value.replace(", ", ",")
value = value.replace(" ", " ")
if key.upper() == "TITLE" and hasattr(
schema, "ENCLOSE_TITLE_IN_QUOTES"
):
if getattr(schema, "ENCLOSE_TITLE_IN_QUOTES"):
value = f'"{value}"'
arguments.append(f"{api_key}={value}")
return arguments
@@ -564,18 +595,33 @@ class QueryTransformer:
class Api:
def __init__(
self, site: str, url: str, prefix: str, replace: Optional[Dict[str, str]] = None
self,
site: str,
url: str,
prefix: Type[Enum],
library_identifier: str,
notsupported_args: Optional[List[str]] = None,
replace: Optional[Dict[str, str]] = None,
):
self.site = site
self.url = url
self.prefix = prefix
self.replace = replace or {}
self.library_identifier = library_identifier
self.notsupported_args = notsupported_args or []
# Reuse TCP connections across requests for better performance
self._session = requests.Session()
# Slightly larger connection pool for concurrent calls
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
# Rate limiting and retry config
self._last_request_time: float = 0.0
self._rate_limit_seconds: float = 1.0
self._max_retries: int = 5
self._overall_timeout_seconds: float = (
30.0 # max total time per logical request
)
def close(self):
try:
@@ -588,36 +634,88 @@ class Api:
self.close()
def get(self, query_args: Union[Iterable[str], str]) -> List[Record]:
start_time = time.monotonic()
# if any query_arg ends with =, remove it
if isinstance(query_args, str):
query_args = [query_args]
if self.site == "DNB":
args = [arg for arg in query_args if not arg.startswith("pica.")]
if args == []:
raise ValueError("DNB queries must include at least one search term")
query_args = args
# query_args = [f"{self.prefix}{arg}" for arg in query_args]
if self.notsupported_args:
query_args = [
qa
for qa in query_args
if not any(qa.startswith(na + "=") for na in self.notsupported_args)
]
query_args = QueryTransformer(
api_schema=self.prefix, arguments=query_args
).transform()
query = "+and+".join(query_args)
for old, new in self.replace.items():
query = query.replace(old, new)
url = self.url.format(query)
print(url)
headers = {
"User-Agent": f"{self.site} SRU Client, <alexander.kirchner@ph-freiburg.de>",
"Accept": "application/xml",
"Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3",
}
# Use persistent session and set timeouts to avoid hanging
resp = self._session.get(url, headers=headers, timeout=(3.05, 60))
if resp.status_code != 200:
raise Exception(f"Error fetching data from SWB: {resp.status_code}")
# Parse using raw bytes (original behavior) to preserve encoding edge cases
sr = parse_search_retrieve_response(resp.content)
return sr.records
# Use persistent session, enforce 1 req/sec, and retry up to 5 times
last_error: Optional[Exception] = None
for attempt in range(1, self._max_retries + 1):
# Abort if overall timeout exceeded before starting attempt
if time.monotonic() - start_time > self._overall_timeout_seconds:
last_error = requests.exceptions.Timeout(
f"Overall timeout {self._overall_timeout_seconds}s exceeded before attempt {attempt}"
)
break
# Enforce rate limit relative to last request end
now = time.monotonic()
elapsed = now - self._last_request_time
if elapsed < self._rate_limit_seconds:
time.sleep(self._rate_limit_seconds - elapsed)
try:
# Per-attempt read timeout capped at remaining overall budget (but at most 30s)
remaining = max(
0.0, self._overall_timeout_seconds - (time.monotonic() - start_time)
)
read_timeout = min(30.0, remaining if remaining > 0 else 0.001)
resp = self._session.get(
url, headers=headers, timeout=(3.05, read_timeout)
)
self._last_request_time = time.monotonic()
if resp.status_code == 200:
# Parse using raw bytes (original behavior) to preserve encoding edge cases
sr = parse_search_retrieve_response(resp.content)
return sr.records
else:
last_error = Exception(
f"Error fetching data from {self.site}: HTTP {resp.status_code} (attempt {attempt}/{self._max_retries})"
)
except requests.exceptions.ReadTimeout as e:
last_error = e
except requests.exceptions.Timeout as e:
last_error = e
except Exception as e:
last_error = e
# Wait before the next attempt to respect rate limit between retries as well
if attempt < self._max_retries:
time.sleep(self._rate_limit_seconds)
# If we exit the loop, all attempts failed
raise last_error if last_error else Exception("Unknown request failure")
def getBooks(self, query_args: Union[Iterable[str], str]) -> List[BookData]:
records: List[Record] = self.get(query_args)
try:
records: List[Record] = self.get(query_args)
except requests.exceptions.ReadTimeout:
# Return a list with a single empty BookData object on read timeout
return [BookData()]
except requests.exceptions.Timeout:
# Overall timeout exceeded
return [BookData()]
except Exception:
# Propagate other errors (could also choose to return empty list)
raise
# Avoid printing on hot paths; rely on logger if needed
books: List[BookData] = []
# extract title from query_args if present
@@ -627,7 +725,7 @@ class Api:
title = arg.split("=")[1]
break
for rec in records:
book = book_from_marc(rec.recordData)
book = book_from_marc(rec.recordData, self.library_identifier)
books.append(book)
if title:
books = [