add api files

This commit is contained in:
2025-10-13 14:25:54 +02:00
parent 59a3d572bb
commit 2eef8dee7c
5 changed files with 752 additions and 0 deletions

0
src/bibapi/catalogue.py Normal file
View File

0
src/bibapi/lehmanns.py Normal file
View File

View File

@@ -0,0 +1,113 @@
@dataclass
class BookData:
ppn: str | None = None
title: str | None = None
signature: str | None = None
edition: str | None = None
link: str | None = None
isbn: Union[str, list[str], None] = field(default_factory=list)
author: str | None = None
language: Union[str, list[str], None] = field(default_factory=list)
publisher: str | None = None
place: str | None = None
year: int | None = None
pages: str | None = None
library_location: str | None = None
in_apparat: bool | None = False
adis_idn: str | None = None
old_book: Any | None = None
media_type: str | None = None #
in_library: bool | None = None # whether the book is in the library or not
def __post_init__(self):
self.library_location = (
str(self.library_location) if self.library_location else None
)
if isinstance(self.language, list) and self.language:
self.language = [lang.strip() for lang in self.language if lang.strip()]
self.language = ",".join(self.language)
self.year = regex.sub(r"[^\d]", "", str(self.year)) if self.year else None
self.in_library = True if self.signature else False
def from_dict(self, data: dict) -> "BookData":
for key, value in data.items():
setattr(self, key, value)
return self
def merge(self, other: "BookData") -> "BookData":
for key, value in other.__dict__.items():
# merge lists, if the attribute is a list, extend it
if isinstance(value, list):
current_value = getattr(self, key)
if current_value is None:
current_value = []
elif not isinstance(current_value, list):
current_value = [current_value]
# extend the list with the new values, but only if they are not already in the list
for v in value:
if v not in current_value:
current_value.append(v)
setattr(self, key, current_value)
if value is not None and (
getattr(self, key) is None or getattr(self, key) == ""
):
setattr(self, key, value)
# in language, drop all entries that are longer than 3 characters
if isinstance(self.language, list):
self.language = [lang for lang in self.language if len(lang) <= 4]
return self
@property
def to_dict(self) -> str:
"""Convert the dataclass to a dictionary."""
data_dict = {
key: value for key, value in self.__dict__.items() if value is not None
}
# remove old_book from data_dict
if "old_book" in data_dict:
del data_dict["old_book"]
return json.dumps(data_dict, ensure_ascii=False)
def from_dataclass(self, dataclass: Optional[Any]) -> None:
if dataclass is None:
return
for key, value in dataclass.__dict__.items():
setattr(self, key, value)
def get_book_type(self) -> str:
if "Online" in self.pages:
return "eBook"
else:
return "Druckausgabe"
def from_string(self, data: str) -> "BookData":
ndata = json.loads(data)
return BookData(**ndata)
def from_LehmannsSearchResult(self, result: Any) -> "BookData":
self.title = result.title
self.author = "; ".join(result.authors) if result.authors else None
self.edition = str(result.edition) if result.edition else None
self.link = result.url
self.isbn = (
result.isbn13
if isinstance(result.isbn13, list)
else [result.isbn13]
if result.isbn13
else []
)
self.pages = str(result.pages) if result.pages else None
self.publisher = result.publisher
self.year = str(result.year) if result.year else None
# self.pages = str(result.pages) if result.pages else None
return self
@property
def edition_number(self) -> Optional[int]:
if self.edition is None:
return 0
match = regex.search(r"(\d+)", self.edition)
if match:
return int(match.group(1))
return 0

632
src/bibapi/sru.py Normal file
View File

@@ -0,0 +1,632 @@
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Iterable, List, Optional, Tuple, Union
import requests
from requests.adapters import HTTPAdapter
from src.shared.logging import log
# centralized logging used via src.shared.logging
from .schemas.bookdata import BookData
log # ensure imported logger is referenced
# -----------------------
# Dataclasses
# -----------------------
# --- MARC XML structures ---
@dataclass
class ControlField:
tag: str
value: str
@dataclass
class SubField:
code: str
value: str
@dataclass
class DataField:
tag: str
ind1: str = " "
ind2: str = " "
subfields: List[SubField] = field(default_factory=list)
@dataclass
class MarcRecord:
leader: str
controlfields: List[ControlField] = field(default_factory=list)
datafields: List[DataField] = field(default_factory=list)
# --- SRU record wrapper ---
@dataclass
class Record:
recordSchema: str
recordPacking: str
recordData: MarcRecord
recordPosition: int
@dataclass
class EchoedSearchRequest:
version: str
query: str
maximumRecords: int
recordPacking: str
recordSchema: str
@dataclass
class SearchRetrieveResponse:
version: str
numberOfRecords: int
records: List[Record] = field(default_factory=list)
echoedSearchRetrieveRequest: Optional[EchoedSearchRequest] = None
# -----------------------
# Parser
# -----------------------
ZS = "http://www.loc.gov/zing/srw/"
MARC = "http://www.loc.gov/MARC21/slim"
NS = {"zs": ZS, "marc": MARC}
def _text(elem: Optional[ET.Element]) -> str:
return (elem.text or "") if elem is not None else ""
def _req_text(parent: ET.Element, path: str) -> str:
el = parent.find(path, NS)
if el is None or el.text is None:
return None
return el.text
def parse_marc_record(record_el: ET.Element) -> MarcRecord:
"""
record_el is the <marc:record> element (default ns MARC in your sample)
"""
# leader
leader_text = _req_text(record_el, "marc:leader")
# controlfields
controlfields: List[ControlField] = []
for cf in record_el.findall("marc:controlfield", NS):
tag = cf.get("tag", "").strip()
controlfields.append(ControlField(tag=tag, value=_text(cf)))
# datafields
datafields: List[DataField] = []
for df in record_el.findall("marc:datafield", NS):
tag = df.get("tag", "").strip()
ind1 = df.get("ind1") or " "
ind2 = df.get("ind2") or " "
subfields: List[SubField] = []
for sf in df.findall("marc:subfield", NS):
code = sf.get("code", "")
subfields.append(SubField(code=code, value=_text(sf)))
datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields))
return MarcRecord(
leader=leader_text, controlfields=controlfields, datafields=datafields
)
def parse_record(zs_record_el: ET.Element) -> Record:
recordSchema = _req_text(zs_record_el, "zs:recordSchema")
recordPacking = _req_text(zs_record_el, "zs:recordPacking")
# recordData contains a MARC <record> with default MARC namespace in your sample
recordData_el = zs_record_el.find("zs:recordData", NS)
if recordData_el is None:
raise ValueError("Missing zs:recordData")
marc_record_el = recordData_el.find("marc:record", NS)
if marc_record_el is None:
# If the MARC record uses default ns (xmlns="...") ElementTree still needs the ns-qualified name
# We already searched with prefix; this covers both default and prefixed cases.
raise ValueError("Missing MARC21 record inside zs:recordData")
marc_record = parse_marc_record(marc_record_el)
recordPosition = int(_req_text(zs_record_el, "zs:recordPosition"))
return Record(
recordSchema=recordSchema,
recordPacking=recordPacking,
recordData=marc_record,
recordPosition=recordPosition,
)
def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]:
el = root.find("zs:echoedSearchRetrieveRequest", NS)
if el is None:
return None
# Be permissive with missing fields
version = _text(el.find("zs:version", NS))
query = _text(el.find("zs:query", NS))
maximumRecords_text = _text(el.find("zs:maximumRecords", NS)) or "0"
recordPacking = _text(el.find("zs:recordPacking", NS))
recordSchema = _text(el.find("zs:recordSchema", NS))
try:
maximumRecords = int(maximumRecords_text)
except ValueError:
maximumRecords = 0
return EchoedSearchRequest(
version=version,
query=query,
maximumRecords=maximumRecords,
recordPacking=recordPacking,
recordSchema=recordSchema,
)
def parse_search_retrieve_response(
xml_str: Union[str, bytes],
) -> SearchRetrieveResponse:
root = ET.fromstring(xml_str)
# Root is zs:searchRetrieveResponse
version = _req_text(root, "zs:version")
numberOfRecords = int(_req_text(root, "zs:numberOfRecords") or "0")
records_parent = root.find("zs:records", NS)
records: List[Record] = []
if records_parent is not None:
for r in records_parent.findall("zs:record", NS):
records.append(parse_record(r))
echoed = parse_echoed_request(root)
return SearchRetrieveResponse(
version=version,
numberOfRecords=numberOfRecords,
records=records,
echoedSearchRetrieveRequest=echoed,
)
# --- Query helpers over MarcRecord ---
def iter_datafields(
rec: MarcRecord,
tag: Optional[str] = None,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
) -> Iterable[DataField]:
"""Yield datafields, optionally filtered by tag/indicators."""
for df in rec.datafields:
if tag is not None and df.tag != tag:
continue
if ind1 is not None and df.ind1 != ind1:
continue
if ind2 is not None and df.ind2 != ind2:
continue
yield df
def subfield_values(
rec: MarcRecord,
tag: str,
code: str,
*,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
) -> List[str]:
"""All values for subfield `code` in every `tag` field (respecting indicators)."""
out: List[str] = []
for df in iter_datafields(rec, tag, ind1, ind2):
out.extend(sf.value for sf in df.subfields if sf.code == code)
return out
def first_subfield_value(
rec: MarcRecord,
tag: str,
code: str,
*,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
default: Optional[str] = None,
) -> Optional[str]:
"""First value for subfield `code` in `tag` (respecting indicators)."""
for df in iter_datafields(rec, tag, ind1, ind2):
for sf in df.subfields:
if sf.code == code:
return sf.value
return default
def find_datafields_with_subfields(
rec: MarcRecord,
tag: str,
*,
where_all: Optional[Dict[str, str]] = None,
where_any: Optional[Dict[str, str]] = None,
casefold: bool = False,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
) -> List[DataField]:
"""
Return datafields of `tag` whose subfields match constraints:
- where_all: every (code -> exact value) must be present
- where_any: at least one (code -> exact value) present
Set `casefold=True` for case-insensitive comparison.
"""
where_all = where_all or {}
where_any = where_any or {}
matched: List[DataField] = []
for df in iter_datafields(rec, tag, ind1, ind2):
# Map code -> list of values (with optional casefold applied)
vals: Dict[str, List[str]] = {}
for sf in df.subfields:
v = sf.value.casefold() if casefold else sf.value
vals.setdefault(sf.code, []).append(v)
ok = True
for c, v in where_all.items():
vv = v.casefold() if casefold else v
if c not in vals or vv not in vals[c]:
ok = False
break
if ok and where_any:
any_ok = any(
(c in vals) and ((v.casefold() if casefold else v) in vals[c])
for c, v in where_any.items()
)
if not any_ok:
ok = False
if ok:
matched.append(df)
return matched
def controlfield_value(
rec: MarcRecord, tag: str, default: Optional[str] = None
) -> Optional[str]:
"""Get the first controlfield value by tag (e.g., '001', '005')."""
for cf in rec.controlfields:
if cf.tag == tag:
return cf.value
return default
def datafields_value(
data: List[DataField], code: str, default: Optional[str] = None
) -> Optional[str]:
"""Get the first value for a specific subfield code in a list of datafields."""
for df in data:
for sf in df.subfields:
if sf.code == code:
return sf.value
return default
def datafield_value(
df: DataField, code: str, default: Optional[str] = None
) -> Optional[str]:
"""Get the first value for a specific subfield code in a datafield."""
for sf in df.subfields:
if sf.code == code:
return sf.value
return default
def _smart_join_title(a: str, b: Optional[str]) -> str:
"""
Join 245 $a and $b with MARC-style punctuation.
If $b is present, join with ' : ' unless either side already supplies punctuation.
"""
a = a.strip()
if not b:
return a
b = b.strip()
if a.endswith((":", ";", "/")) or b.startswith((":", ";", "/")):
return f"{a} {b}"
return f"{a} : {b}"
def subfield_values_from_fields(
fields: Iterable[DataField],
code: str,
) -> List[str]:
"""All subfield values with given `code` across a list of DataField."""
return [sf.value for df in fields for sf in df.subfields if sf.code == code]
def first_subfield_value_from_fields(
fields: Iterable[DataField],
code: str,
default: Optional[str] = None,
) -> Optional[str]:
"""First subfield value with given `code` across a list of DataField."""
for df in fields:
for sf in df.subfields:
if sf.code == code:
return sf.value
return default
def subfield_value_pairs_from_fields(
fields: Iterable[DataField],
code: str,
) -> List[Tuple[DataField, str]]:
"""
Return (DataField, value) pairs for all subfields with `code`.
Useful if you need to know which field a value came from.
"""
out: List[Tuple[DataField, str]] = []
for df in fields:
for sf in df.subfields:
if sf.code == code:
out.append((df, sf.value))
return out
def book_from_marc(rec: MarcRecord) -> BookData:
# PPN from controlfield 001
ppn = controlfield_value(rec, "001")
# Title = 245 $a + 245 $b (if present)
t_a = first_subfield_value(rec, "245", "a")
t_b = first_subfield_value(rec, "245", "b")
title = _smart_join_title(t_a, t_b) if t_a else None
# Signature = 924 where $9 == "Frei 129" → take that field's $g
frei_fields = find_datafields_with_subfields(
rec, "924", where_all={"9": "Frei 129"}
)
signature = first_subfield_value_from_fields(frei_fields, "g")
# Year = 264 $c (prefer ind2="1" publication; fallback to any 264)
year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value(
rec, "264", "c"
)
isbn = subfield_values(rec, "020", "a")
mediatype = first_subfield_value(rec, "338", "a")
lang = subfield_values(rec, "041", "a")
authors = subfield_values(rec, "700", "a")
author = None
if authors:
author = "; ".join(authors)
return BookData(
ppn=ppn,
title=title,
signature=signature,
edition=first_subfield_value(rec, "250", "a") or "",
year=year,
pages=first_subfield_value(rec, "300", "a") or "",
publisher=first_subfield_value(rec, "264", "b") or "",
isbn=isbn,
language=lang,
link="",
author=author,
media_type=mediatype,
)
class SWBData(Enum):
URL = "https://sru.k10plus.de/opac-de-627!rec=1?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=marcxml"
ARGSCHEMA = "pica."
NAME = "SWB"
class DNBData(Enum):
URL = "https://services.dnb.de/sru/dnb?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=MARC21-xml"
ARGSCHEMA = ""
NAME = "DNB"
class SRUSite(Enum):
SWB = SWBData
DNB = DNBData
RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK notations
def find_newer_edition(
swb_result: BookData, dnb_result: List[BookData]
) -> Optional[List[BookData]]:
"""
New edition if:
- year > swb.year OR
- edition_number > swb.edition_number
Additional guards & preferences:
- If both have signatures and they differ, skip (not the same work).
- For duplicates (same ppn): keep the one that has a signature, and
prefer a signature that matches swb_result.signature.
- If multiple remain: keep the single 'latest' by (year desc,
edition_number desc, best-signature-match desc, has-signature desc).
"""
def norm_sig(s: Optional[str]) -> str:
if not s:
return ""
# normalize: lowercase, collapse whitespace, keep alnum + a few separators
s = s.lower()
s = re.sub(r"\s+", " ", s).strip()
# remove obvious noise; adjust if your signature format differs
s = re.sub(r"[^a-z0-9\-_/\. ]+", "", s)
return s
def has_sig(b: BookData) -> bool:
return bool(getattr(b, "signature", None))
def sig_matches_swb(b: BookData) -> bool:
if not has_sig(b) or not has_sig(swb_result):
return False
return norm_sig(b.signature) == norm_sig(swb_result.signature)
def strictly_newer(b: BookData) -> bool:
by_year = (
b.year is not None
and swb_result.year is not None
and b.year > swb_result.year
)
by_edition = (
b.edition_number is not None
and swb_result.edition_number is not None
and b.edition_number > swb_result.edition_number
)
return by_year or by_edition
swb_sig_norm = norm_sig(getattr(swb_result, "signature", None))
# 1) Filter to same-work AND newer
candidates: List[BookData] = []
for b in dnb_result:
# Skip if both signatures exist and don't match (different work)
b_sig = getattr(b, "signature", None)
if b_sig and swb_result.signature:
if norm_sig(b_sig) != swb_sig_norm:
continue # not the same work
# Keep only if newer by rules
if strictly_newer(b):
candidates.append(b)
if not candidates:
return None
# 2) Dedupe by PPN, preferring signature (and matching signature if possible)
by_ppn: dict[Optional[str], BookData] = {}
for b in candidates:
key = getattr(b, "ppn", None)
prev = by_ppn.get(key)
if prev is None:
by_ppn[key] = b
continue
# Compute preference score for both
def ppn_pref_score(x: BookData) -> tuple[int, int]:
# (signature matches swb, has signature)
return (1 if sig_matches_swb(x) else 0, 1 if has_sig(x) else 0)
if ppn_pref_score(b) > ppn_pref_score(prev):
by_ppn[key] = b
deduped = list(by_ppn.values())
if not deduped:
return None
# 3) If multiple remain, keep only the latest one.
# Order: year desc, edition_number desc, signature-match desc, has-signature desc
def sort_key(b: BookData):
year = b.year if b.year is not None else -1
ed = b.edition_number if b.edition_number is not None else -1
sig_match = 1 if sig_matches_swb(b) else 0
sig_present = 1 if has_sig(b) else 0
return (year, ed, sig_match, sig_present)
best = max(deduped, key=sort_key)
return [best] if best else None
class Api:
def __init__(self, site: str, url: str, prefix: str):
self.site = site
self.url = url
self.prefix = prefix
# Reuse TCP connections across requests for better performance
self._session = requests.Session()
# Slightly larger connection pool for concurrent calls
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
def close(self):
try:
self._session.close()
except Exception:
pass
def __del__(self):
# Best-effort cleanup
self.close()
def get(self, query_args: Iterable[str]) -> List[Record]:
# if any query_arg ends with =, remove it
if self.site == "DNB":
args = [arg for arg in query_args if not arg.startswith("pica.")]
if args == []:
raise ValueError("DNB queries must include at least one search term")
query_args = args
# query_args = [f"{self.prefix}{arg}" for arg in query_args]
query = "+and+".join(query_args)
query = query.replace(" ", "%20").replace("&", "%26")
# query_args = [arg for arg in query_args if not arg.endswith("=")]
# query = "+and+".join(query_args)
# query = query.replace(" ", "%20").replace("&", "%26")
# insert the query into the url url is
url = self.url.format(query)
log.debug(url)
headers = {
"User-Agent": f"{self.site} SRU Client, <alexander.kirchner@ph-freiburg.de>",
"Accept": "application/xml",
"Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3",
}
# Use persistent session and set timeouts to avoid hanging
resp = self._session.get(url, headers=headers, timeout=(3.05, 60))
if resp.status_code != 200:
raise Exception(f"Error fetching data from SWB: {resp.status_code}")
# Parse using raw bytes (original behavior) to preserve encoding edge cases
sr = parse_search_retrieve_response(resp.content)
return sr.records
def getBooks(self, query_args: Iterable[str]) -> List[BookData]:
records: List[Record] = self.get(query_args)
# Avoid printing on hot paths; rely on logger if needed
log.debug(f"{self.site} found {len(records)} records for args={query_args}")
books: List[BookData] = []
# extract title from query_args if present
title = None
for arg in query_args:
if arg.startswith("pica.tit="):
title = arg.split("=")[1]
break
for rec in records:
book = book_from_marc(rec.recordData)
books.append(book)
if title:
books = [
b
for b in books
if b.title and b.title.lower().startswith(title.lower())
]
return books
def getLinkForBook(self, book: BookData) -> str:
# Not implemented: depends on catalog front-end; return empty string for now
return ""
class SWB(Api):
def __init__(self):
self.site = SWBData.NAME.value
self.url = SWBData.URL.value
self.prefix = SWBData.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)

7
uv.lock generated Normal file
View File

@@ -0,0 +1,7 @@
version = 1
requires-python = ">=3.13"
[[package]]
name = "bibapi"
version = "0.1.0"
source = { editable = "." }