Files
BibAPI/src/bibapi/sru.py

665 lines
22 KiB
Python

import re
import time
import xml.etree.ElementTree as ET
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
import requests
from requests.adapters import HTTPAdapter
# centralized logging used via src.shared.logging
from .schemas.bookdata import BookData
from .schemas.marcxml import (
ControlField,
DataField,
EchoedSearchRequest,
MarcRecord,
Record,
SearchRetrieveResponse,
SubField,
)
ZS = "http://www.loc.gov/zing/srw/"
MARC = "http://www.loc.gov/MARC21/slim"
NS = {"zs": ZS, "marc": MARC}
def _text(elem: Optional[ET.Element]) -> str:
return (elem.text or "") if elem is not None else ""
def _req_text(parent: ET.Element, path: str) -> str:
el = parent.find(path, NS)
if el is None or el.text is None:
return None
return el.text
def parse_marc_record(record_el: ET.Element) -> MarcRecord:
"""
record_el is the <marc:record> element (default ns MARC in your sample)
"""
# leader
leader_text = _req_text(record_el, "marc:leader")
# controlfields
controlfields: List[ControlField] = []
for cf in record_el.findall("marc:controlfield", NS):
tag = cf.get("tag", "").strip()
controlfields.append(ControlField(tag=tag, value=_text(cf)))
# datafields
datafields: List[DataField] = []
for df in record_el.findall("marc:datafield", NS):
tag = df.get("tag", "").strip()
ind1 = df.get("ind1") or " "
ind2 = df.get("ind2") or " "
subfields: List[SubField] = []
for sf in df.findall("marc:subfield", NS):
code = sf.get("code", "")
subfields.append(SubField(code=code, value=_text(sf)))
datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields))
return MarcRecord(
leader=leader_text, controlfields=controlfields, datafields=datafields
)
def parse_record(zs_record_el: ET.Element) -> Record:
recordSchema = _req_text(zs_record_el, "zs:recordSchema")
recordPacking = _req_text(zs_record_el, "zs:recordPacking")
# recordData contains a MARC <record> with default MARC namespace in your sample
recordData_el = zs_record_el.find("zs:recordData", NS)
if recordData_el is None:
raise ValueError("Missing zs:recordData")
marc_record_el = recordData_el.find("marc:record", NS)
if marc_record_el is None:
# If the MARC record uses default ns (xmlns="...") ElementTree still needs the ns-qualified name
# We already searched with prefix; this covers both default and prefixed cases.
return None
raise ValueError("Missing MARC21 record inside zs:recordData")
marc_record = parse_marc_record(marc_record_el)
recordPosition = int(_req_text(zs_record_el, "zs:recordPosition"))
return Record(
recordSchema=recordSchema,
recordPacking=recordPacking,
recordData=marc_record,
recordPosition=recordPosition,
)
def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]:
el = root.find("zs:echoedSearchRetrieveRequest", NS)
if el is None:
return None
# Be permissive with missing fields
version = _text(el.find("zs:version", NS))
query = _text(el.find("zs:query", NS))
maximumRecords_text = _text(el.find("zs:maximumRecords", NS)) or "0"
recordPacking = _text(el.find("zs:recordPacking", NS))
recordSchema = _text(el.find("zs:recordSchema", NS))
try:
maximumRecords = int(maximumRecords_text)
except ValueError:
maximumRecords = 0
return EchoedSearchRequest(
version=version,
query=query,
maximumRecords=maximumRecords,
recordPacking=recordPacking,
recordSchema=recordSchema,
)
def parse_search_retrieve_response(
xml_str: Union[str, bytes],
) -> SearchRetrieveResponse:
root = ET.fromstring(xml_str)
# Root is zs:searchRetrieveResponse
version = _req_text(root, "zs:version")
numberOfRecords = int(_req_text(root, "zs:numberOfRecords") or "0")
records_parent = root.find("zs:records", NS)
records: List[Record] = []
if records_parent is not None:
for r in records_parent.findall("zs:record", NS):
record = parse_record(r)
if record is not None:
records.append(record)
echoed = parse_echoed_request(root)
return SearchRetrieveResponse(
version=version,
numberOfRecords=numberOfRecords,
records=records,
echoedSearchRetrieveRequest=echoed,
)
# --- Query helpers over MarcRecord ---
def iter_datafields(
rec: MarcRecord,
tag: Optional[str] = None,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
) -> Iterable[DataField]:
"""Yield datafields, optionally filtered by tag/indicators."""
for df in rec.datafields:
if tag is not None and df.tag != tag:
continue
if ind1 is not None and df.ind1 != ind1:
continue
if ind2 is not None and df.ind2 != ind2:
continue
yield df
def subfield_values(
rec: MarcRecord,
tag: str,
code: str,
*,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
) -> List[str]:
"""All values for subfield `code` in every `tag` field (respecting indicators)."""
out: List[str] = []
for df in iter_datafields(rec, tag, ind1, ind2):
out.extend(sf.value for sf in df.subfields if sf.code == code)
return out
def first_subfield_value(
rec: MarcRecord,
tag: str,
code: str,
*,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
default: Optional[str] = None,
) -> Optional[str]:
"""First value for subfield `code` in `tag` (respecting indicators)."""
for df in iter_datafields(rec, tag, ind1, ind2):
for sf in df.subfields:
if sf.code == code:
return sf.value
return default
def find_datafields_with_subfields(
rec: MarcRecord,
tag: str,
*,
where_all: Optional[Dict[str, str]] = None,
where_any: Optional[Dict[str, str]] = None,
casefold: bool = False,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
) -> List[DataField]:
"""
Return datafields of `tag` whose subfields match constraints:
- where_all: every (code -> exact value) must be present
- where_any: at least one (code -> exact value) present
Set `casefold=True` for case-insensitive comparison.
"""
where_all = where_all or {}
where_any = where_any or {}
matched: List[DataField] = []
for df in iter_datafields(rec, tag, ind1, ind2):
# Map code -> list of values (with optional casefold applied)
vals: Dict[str, List[str]] = {}
for sf in df.subfields:
v = sf.value.casefold() if casefold else sf.value
vals.setdefault(sf.code, []).append(v)
ok = True
for c, v in where_all.items():
vv = v.casefold() if casefold else v
if c not in vals or vv not in vals[c]:
ok = False
break
if ok and where_any:
any_ok = any(
(c in vals) and ((v.casefold() if casefold else v) in vals[c])
for c, v in where_any.items()
)
if not any_ok:
ok = False
if ok:
matched.append(df)
return matched
def controlfield_value(
rec: MarcRecord, tag: str, default: Optional[str] = None
) -> Optional[str]:
"""Get the first controlfield value by tag (e.g., '001', '005')."""
for cf in rec.controlfields:
if cf.tag == tag:
return cf.value
return default
def datafields_value(
data: List[DataField], code: str, default: Optional[str] = None
) -> Optional[str]:
"""Get the first value for a specific subfield code in a list of datafields."""
for df in data:
for sf in df.subfields:
if sf.code == code:
return sf.value
return default
def datafield_value(
df: DataField, code: str, default: Optional[str] = None
) -> Optional[str]:
"""Get the first value for a specific subfield code in a datafield."""
for sf in df.subfields:
if sf.code == code:
return sf.value
return default
def _smart_join_title(a: str, b: Optional[str]) -> str:
"""
Join 245 $a and $b with MARC-style punctuation.
If $b is present, join with ' : ' unless either side already supplies punctuation.
"""
a = a.strip()
if not b:
return a
b = b.strip()
if a.endswith((":", ";", "/")) or b.startswith((":", ";", "/")):
return f"{a} {b}"
return f"{a} : {b}"
def subfield_values_from_fields(
fields: Iterable[DataField],
code: str,
) -> List[str]:
"""All subfield values with given `code` across a list of DataField."""
return [sf.value for df in fields for sf in df.subfields if sf.code == code]
def first_subfield_value_from_fields(
fields: Iterable[DataField],
code: str,
default: Optional[str] = None,
) -> Optional[str]:
"""First subfield value with given `code` across a list of DataField."""
for df in fields:
for sf in df.subfields:
if sf.code == code:
return sf.value
return default
def subfield_value_pairs_from_fields(
fields: Iterable[DataField],
code: str,
) -> List[Tuple[DataField, str]]:
"""
Return (DataField, value) pairs for all subfields with `code`.
Useful if you need to know which field a value came from.
"""
out: List[Tuple[DataField, str]] = []
for df in fields:
for sf in df.subfields:
if sf.code == code:
out.append((df, sf.value))
return out
def book_from_marc(rec: MarcRecord, library_identifier: str) -> BookData:
library_ident_tag = library_identifier.split("$")[0]
library_ident_code = library_identifier.split("$")[1]
# PPN from controlfield 001
ppn = controlfield_value(rec, "001")
# Title = 245 $a + 245 $b (if present)
t_a = first_subfield_value(rec, "245", "a")
t_b = "" # first_subfield_value(rec, "245", "b")
title = _smart_join_title(t_a, t_b) if t_a else None
# Signature = 924 where $9 == "Frei 129" → take that field's $g
frei_fields = find_datafields_with_subfields(
rec, "924", where_all={"9": "Frei 129"}
)
signature = first_subfield_value_from_fields(frei_fields, "g")
# Year = 264 $c (prefer ind2="1" publication; fallback to any 264)
year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value(
rec, "264", "c"
)
isbn = subfield_values(rec, "020", "a")
mediatype = first_subfield_value(rec, "338", "a")
lang = subfield_values(rec, "041", "a")
authors = subfield_values(rec, "700", "a")
author = None
if authors:
author = "; ".join(authors)
libraries = subfield_values(rec, library_ident_tag, library_ident_code)
return BookData(
ppn=ppn,
title=title,
signature=signature,
edition=first_subfield_value(rec, "250", "a") or "",
year=year,
pages=first_subfield_value(rec, "300", "a") or "",
publisher=first_subfield_value(rec, "264", "b") or "",
isbn=isbn,
language=lang,
link="",
author=author,
media_type=mediatype,
libraries=libraries,
)
RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK notations
def find_newer_edition(
swb_result: BookData, dnb_result: List[BookData]
) -> Optional[List[BookData]]:
"""
New edition if:
- year > swb.year OR
- edition_number > swb.edition_number
Additional guards & preferences:
- If both have signatures and they differ, skip (not the same work).
- For duplicates (same ppn): keep the one that has a signature, and
prefer a signature that matches swb_result.signature.
- If multiple remain: keep the single 'latest' by (year desc,
edition_number desc, best-signature-match desc, has-signature desc).
"""
def norm_sig(s: Optional[str]) -> str:
if not s:
return ""
# normalize: lowercase, collapse whitespace, keep alnum + a few separators
s = s.lower()
s = re.sub(r"\s+", " ", s).strip()
# remove obvious noise; adjust if your signature format differs
s = re.sub(r"[^a-z0-9\-_/\. ]+", "", s)
return s
def has_sig(b: BookData) -> bool:
return bool(getattr(b, "signature", None))
def sig_matches_swb(b: BookData) -> bool:
if not has_sig(b) or not has_sig(swb_result):
return False
return norm_sig(b.signature) == norm_sig(swb_result.signature)
def strictly_newer(b: BookData) -> bool:
by_year = (
b.year is not None
and swb_result.year is not None
and b.year > swb_result.year
)
by_edition = (
b.edition_number is not None
and swb_result.edition_number is not None
and b.edition_number > swb_result.edition_number
)
return by_year or by_edition
swb_sig_norm = norm_sig(getattr(swb_result, "signature", None))
# 1) Filter to same-work AND newer
candidates: List[BookData] = []
for b in dnb_result:
# Skip if both signatures exist and don't match (different work)
b_sig = getattr(b, "signature", None)
if b_sig and swb_result.signature:
if norm_sig(b_sig) != swb_sig_norm:
continue # not the same work
# Keep only if newer by rules
if strictly_newer(b):
candidates.append(b)
if not candidates:
return None
# 2) Dedupe by PPN, preferring signature (and matching signature if possible)
by_ppn: dict[Optional[str], BookData] = {}
for b in candidates:
key = getattr(b, "ppn", None)
prev = by_ppn.get(key)
if prev is None:
by_ppn[key] = b
continue
# Compute preference score for both
def ppn_pref_score(x: BookData) -> tuple[int, int]:
# (signature matches swb, has signature)
return (1 if sig_matches_swb(x) else 0, 1 if has_sig(x) else 0)
if ppn_pref_score(b) > ppn_pref_score(prev):
by_ppn[key] = b
deduped = list(by_ppn.values())
if not deduped:
return None
# 3) If multiple remain, keep only the latest one.
# Order: year desc, edition_number desc, signature-match desc, has-signature desc
def sort_key(b: BookData):
year = b.year if b.year is not None else -1
ed = b.edition_number if b.edition_number is not None else -1
sig_match = 1 if sig_matches_swb(b) else 0
sig_present = 1 if has_sig(b) else 0
return (year, ed, sig_match, sig_present)
best = max(deduped, key=sort_key)
return [best] if best else None
class QueryTransformer:
def __init__(self, api_schema: Type[Enum], arguments: Union[Iterable[str], str]):
self.api_schema = api_schema
if isinstance(arguments, str):
self.arguments = [arguments]
else:
self.arguments = arguments
self.drop_empty = True
def transform(self) -> Dict[str, Any]:
arguments: List[str] = []
schema = self.api_schema
for arg in self.arguments:
if "=" not in arg:
continue
if self.drop_empty and arg.endswith("="):
continue
key, value = arg.split("=", 1)
if hasattr(schema, key.upper()):
api_key = getattr(schema, key.upper()).value
if key.upper() == "AUTHOR" and hasattr(schema, "AUTHOR_SCHEMA"):
author_schema = getattr(schema, "AUTHOR_SCHEMA").value
if author_schema == "SpaceAfterComma":
value = value.replace(",", ", ")
elif author_schema == "NoSpaceAfterComma":
value = value.replace(", ", ",")
value = value.replace(" ", " ")
if key.upper() == "TITLE" and hasattr(
schema, "ENCLOSE_TITLE_IN_QUOTES"
):
if getattr(schema, "ENCLOSE_TITLE_IN_QUOTES"):
value = f'"{value}"'
arguments.append(f"{api_key}={value}")
return arguments
class Api:
def __init__(
self,
site: str,
url: str,
prefix: Type[Enum],
library_identifier: str,
notsupported_args: Optional[List[str]] = None,
replace: Optional[Dict[str, str]] = None,
):
self.site = site
self.url = url
self.prefix = prefix
self.replace = replace or {}
self.library_identifier = library_identifier
self.notsupported_args = notsupported_args or []
# Reuse TCP connections across requests for better performance
self._session = requests.Session()
# Slightly larger connection pool for concurrent calls
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
# Rate limiting and retry config
self._last_request_time: float = 0.0
self._rate_limit_seconds: float = 1.0
self._max_retries: int = 5
self._overall_timeout_seconds: float = (
30.0 # max total time per logical request
)
def close(self):
try:
self._session.close()
except Exception:
pass
def __del__(self):
# Best-effort cleanup
self.close()
def get(self, query_args: Union[Iterable[str], str]) -> List[Record]:
start_time = time.monotonic()
# if any query_arg ends with =, remove it
if isinstance(query_args, str):
query_args = [query_args]
if self.notsupported_args:
query_args = [
qa
for qa in query_args
if not any(qa.startswith(na + "=") for na in self.notsupported_args)
]
query_args = QueryTransformer(
api_schema=self.prefix, arguments=query_args
).transform()
query = "+and+".join(query_args)
for old, new in self.replace.items():
query = query.replace(old, new)
url = self.url.format(query)
headers = {
"User-Agent": f"{self.site} SRU Client, <alexander.kirchner@ph-freiburg.de>",
"Accept": "application/xml",
"Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3",
}
# Use persistent session, enforce 1 req/sec, and retry up to 5 times
last_error: Optional[Exception] = None
for attempt in range(1, self._max_retries + 1):
# Abort if overall timeout exceeded before starting attempt
if time.monotonic() - start_time > self._overall_timeout_seconds:
last_error = requests.exceptions.Timeout(
f"Overall timeout {self._overall_timeout_seconds}s exceeded before attempt {attempt}"
)
break
# Enforce rate limit relative to last request end
now = time.monotonic()
elapsed = now - self._last_request_time
if elapsed < self._rate_limit_seconds:
time.sleep(self._rate_limit_seconds - elapsed)
try:
# Per-attempt read timeout capped at remaining overall budget (but at most 30s)
remaining = max(
0.0, self._overall_timeout_seconds - (time.monotonic() - start_time)
)
read_timeout = min(30.0, remaining if remaining > 0 else 0.001)
resp = self._session.get(
url, headers=headers, timeout=(3.05, read_timeout)
)
self._last_request_time = time.monotonic()
if resp.status_code == 200:
# Parse using raw bytes (original behavior) to preserve encoding edge cases
sr = parse_search_retrieve_response(resp.content)
return sr.records
else:
last_error = Exception(
f"Error fetching data from {self.site}: HTTP {resp.status_code} (attempt {attempt}/{self._max_retries})"
)
except requests.exceptions.ReadTimeout as e:
last_error = e
except requests.exceptions.Timeout as e:
last_error = e
except Exception as e:
last_error = e
# Wait before the next attempt to respect rate limit between retries as well
if attempt < self._max_retries:
time.sleep(self._rate_limit_seconds)
# If we exit the loop, all attempts failed
raise last_error if last_error else Exception("Unknown request failure")
def getBooks(self, query_args: Union[Iterable[str], str]) -> List[BookData]:
try:
records: List[Record] = self.get(query_args)
except requests.exceptions.ReadTimeout:
# Return a list with a single empty BookData object on read timeout
return [BookData()]
except requests.exceptions.Timeout:
# Overall timeout exceeded
return [BookData()]
except Exception:
# Propagate other errors (could also choose to return empty list)
raise
# Avoid printing on hot paths; rely on logger if needed
books: List[BookData] = []
# extract title from query_args if present
title = None
for arg in query_args:
if arg.startswith("pica.tit="):
title = arg.split("=")[1]
break
for rec in records:
book = book_from_marc(rec.recordData, self.library_identifier)
books.append(book)
if title:
books = [
b
for b in books
if b.title and b.title.lower().startswith(title.lower())
]
return books
def getLinkForBook(self, book: BookData) -> str:
# Not implemented: depends on catalog front-end; return empty string for now
return ""
# def search(self, query_args: Union[Iterable[str], str]):
# arguments =