diff --git a/src/bibapi/catalogue.py b/src/bibapi/catalogue.py new file mode 100644 index 0000000..e69de29 diff --git a/src/bibapi/lehmanns.py b/src/bibapi/lehmanns.py new file mode 100644 index 0000000..e69de29 diff --git a/src/bibapi/schemas/bookdata.py b/src/bibapi/schemas/bookdata.py new file mode 100644 index 0000000..3bb5dee --- /dev/null +++ b/src/bibapi/schemas/bookdata.py @@ -0,0 +1,113 @@ +@dataclass +class BookData: + ppn: str | None = None + title: str | None = None + signature: str | None = None + edition: str | None = None + link: str | None = None + isbn: Union[str, list[str], None] = field(default_factory=list) + author: str | None = None + language: Union[str, list[str], None] = field(default_factory=list) + publisher: str | None = None + place: str | None = None + year: int | None = None + pages: str | None = None + library_location: str | None = None + in_apparat: bool | None = False + adis_idn: str | None = None + old_book: Any | None = None + media_type: str | None = None # + in_library: bool | None = None # whether the book is in the library or not + + def __post_init__(self): + self.library_location = ( + str(self.library_location) if self.library_location else None + ) + if isinstance(self.language, list) and self.language: + self.language = [lang.strip() for lang in self.language if lang.strip()] + self.language = ",".join(self.language) + self.year = regex.sub(r"[^\d]", "", str(self.year)) if self.year else None + self.in_library = True if self.signature else False + + def from_dict(self, data: dict) -> "BookData": + for key, value in data.items(): + setattr(self, key, value) + return self + + def merge(self, other: "BookData") -> "BookData": + for key, value in other.__dict__.items(): + # merge lists, if the attribute is a list, extend it + if isinstance(value, list): + current_value = getattr(self, key) + if current_value is None: + current_value = [] + elif not isinstance(current_value, list): + current_value = [current_value] + # extend the list with the new values, but only if they are not already in the list + for v in value: + if v not in current_value: + current_value.append(v) + setattr(self, key, current_value) + if value is not None and ( + getattr(self, key) is None or getattr(self, key) == "" + ): + setattr(self, key, value) + # in language, drop all entries that are longer than 3 characters + if isinstance(self.language, list): + self.language = [lang for lang in self.language if len(lang) <= 4] + return self + + @property + def to_dict(self) -> str: + """Convert the dataclass to a dictionary.""" + data_dict = { + key: value for key, value in self.__dict__.items() if value is not None + } + # remove old_book from data_dict + if "old_book" in data_dict: + del data_dict["old_book"] + return json.dumps(data_dict, ensure_ascii=False) + + def from_dataclass(self, dataclass: Optional[Any]) -> None: + if dataclass is None: + return + for key, value in dataclass.__dict__.items(): + setattr(self, key, value) + + def get_book_type(self) -> str: + if "Online" in self.pages: + return "eBook" + else: + return "Druckausgabe" + + def from_string(self, data: str) -> "BookData": + ndata = json.loads(data) + + return BookData(**ndata) + + def from_LehmannsSearchResult(self, result: Any) -> "BookData": + self.title = result.title + self.author = "; ".join(result.authors) if result.authors else None + self.edition = str(result.edition) if result.edition else None + self.link = result.url + self.isbn = ( + result.isbn13 + if isinstance(result.isbn13, list) + else [result.isbn13] + if result.isbn13 + else [] + ) + self.pages = str(result.pages) if result.pages else None + self.publisher = result.publisher + self.year = str(result.year) if result.year else None + # self.pages = str(result.pages) if result.pages else None + return self + + @property + def edition_number(self) -> Optional[int]: + if self.edition is None: + return 0 + match = regex.search(r"(\d+)", self.edition) + if match: + return int(match.group(1)) + return 0 diff --git a/src/bibapi/sru.py b/src/bibapi/sru.py new file mode 100644 index 0000000..b2697c8 --- /dev/null +++ b/src/bibapi/sru.py @@ -0,0 +1,632 @@ +import re +import xml.etree.ElementTree as ET +from dataclasses import dataclass, field +from enum import Enum +from typing import Dict, Iterable, List, Optional, Tuple, Union + +import requests +from requests.adapters import HTTPAdapter + +from src.shared.logging import log + +# centralized logging used via src.shared.logging +from .schemas.bookdata import BookData + +log # ensure imported logger is referenced + + +# ----------------------- +# Dataclasses +# ----------------------- + + +# --- MARC XML structures --- +@dataclass +class ControlField: + tag: str + value: str + + +@dataclass +class SubField: + code: str + value: str + + +@dataclass +class DataField: + tag: str + ind1: str = " " + ind2: str = " " + subfields: List[SubField] = field(default_factory=list) + + +@dataclass +class MarcRecord: + leader: str + controlfields: List[ControlField] = field(default_factory=list) + datafields: List[DataField] = field(default_factory=list) + + +# --- SRU record wrapper --- +@dataclass +class Record: + recordSchema: str + recordPacking: str + recordData: MarcRecord + recordPosition: int + + +@dataclass +class EchoedSearchRequest: + version: str + query: str + maximumRecords: int + recordPacking: str + recordSchema: str + + +@dataclass +class SearchRetrieveResponse: + version: str + numberOfRecords: int + records: List[Record] = field(default_factory=list) + echoedSearchRetrieveRequest: Optional[EchoedSearchRequest] = None + + +# ----------------------- +# Parser +# ----------------------- + +ZS = "http://www.loc.gov/zing/srw/" +MARC = "http://www.loc.gov/MARC21/slim" +NS = {"zs": ZS, "marc": MARC} + + +def _text(elem: Optional[ET.Element]) -> str: + return (elem.text or "") if elem is not None else "" + + +def _req_text(parent: ET.Element, path: str) -> str: + el = parent.find(path, NS) + if el is None or el.text is None: + return None + return el.text + + +def parse_marc_record(record_el: ET.Element) -> MarcRecord: + """ + record_el is the element (default ns MARC in your sample) + """ + # leader + leader_text = _req_text(record_el, "marc:leader") + + # controlfields + controlfields: List[ControlField] = [] + for cf in record_el.findall("marc:controlfield", NS): + tag = cf.get("tag", "").strip() + controlfields.append(ControlField(tag=tag, value=_text(cf))) + + # datafields + datafields: List[DataField] = [] + for df in record_el.findall("marc:datafield", NS): + tag = df.get("tag", "").strip() + ind1 = df.get("ind1") or " " + ind2 = df.get("ind2") or " " + subfields: List[SubField] = [] + for sf in df.findall("marc:subfield", NS): + code = sf.get("code", "") + subfields.append(SubField(code=code, value=_text(sf))) + datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields)) + + return MarcRecord( + leader=leader_text, controlfields=controlfields, datafields=datafields + ) + + +def parse_record(zs_record_el: ET.Element) -> Record: + recordSchema = _req_text(zs_record_el, "zs:recordSchema") + recordPacking = _req_text(zs_record_el, "zs:recordPacking") + + # recordData contains a MARC with default MARC namespace in your sample + recordData_el = zs_record_el.find("zs:recordData", NS) + if recordData_el is None: + raise ValueError("Missing zs:recordData") + + marc_record_el = recordData_el.find("marc:record", NS) + if marc_record_el is None: + # If the MARC record uses default ns (xmlns="...") ElementTree still needs the ns-qualified name + # We already searched with prefix; this covers both default and prefixed cases. + raise ValueError("Missing MARC21 record inside zs:recordData") + + marc_record = parse_marc_record(marc_record_el) + + recordPosition = int(_req_text(zs_record_el, "zs:recordPosition")) + return Record( + recordSchema=recordSchema, + recordPacking=recordPacking, + recordData=marc_record, + recordPosition=recordPosition, + ) + + +def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]: + el = root.find("zs:echoedSearchRetrieveRequest", NS) + if el is None: + return None + + # Be permissive with missing fields + version = _text(el.find("zs:version", NS)) + query = _text(el.find("zs:query", NS)) + maximumRecords_text = _text(el.find("zs:maximumRecords", NS)) or "0" + recordPacking = _text(el.find("zs:recordPacking", NS)) + recordSchema = _text(el.find("zs:recordSchema", NS)) + + try: + maximumRecords = int(maximumRecords_text) + except ValueError: + maximumRecords = 0 + + return EchoedSearchRequest( + version=version, + query=query, + maximumRecords=maximumRecords, + recordPacking=recordPacking, + recordSchema=recordSchema, + ) + + +def parse_search_retrieve_response( + xml_str: Union[str, bytes], +) -> SearchRetrieveResponse: + root = ET.fromstring(xml_str) + + # Root is zs:searchRetrieveResponse + version = _req_text(root, "zs:version") + numberOfRecords = int(_req_text(root, "zs:numberOfRecords") or "0") + + records_parent = root.find("zs:records", NS) + records: List[Record] = [] + if records_parent is not None: + for r in records_parent.findall("zs:record", NS): + records.append(parse_record(r)) + + echoed = parse_echoed_request(root) + + return SearchRetrieveResponse( + version=version, + numberOfRecords=numberOfRecords, + records=records, + echoedSearchRetrieveRequest=echoed, + ) + + +# --- Query helpers over MarcRecord --- + + +def iter_datafields( + rec: MarcRecord, + tag: Optional[str] = None, + ind1: Optional[str] = None, + ind2: Optional[str] = None, +) -> Iterable[DataField]: + """Yield datafields, optionally filtered by tag/indicators.""" + for df in rec.datafields: + if tag is not None and df.tag != tag: + continue + if ind1 is not None and df.ind1 != ind1: + continue + if ind2 is not None and df.ind2 != ind2: + continue + yield df + + +def subfield_values( + rec: MarcRecord, + tag: str, + code: str, + *, + ind1: Optional[str] = None, + ind2: Optional[str] = None, +) -> List[str]: + """All values for subfield `code` in every `tag` field (respecting indicators).""" + out: List[str] = [] + for df in iter_datafields(rec, tag, ind1, ind2): + out.extend(sf.value for sf in df.subfields if sf.code == code) + return out + + +def first_subfield_value( + rec: MarcRecord, + tag: str, + code: str, + *, + ind1: Optional[str] = None, + ind2: Optional[str] = None, + default: Optional[str] = None, +) -> Optional[str]: + """First value for subfield `code` in `tag` (respecting indicators).""" + for df in iter_datafields(rec, tag, ind1, ind2): + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def find_datafields_with_subfields( + rec: MarcRecord, + tag: str, + *, + where_all: Optional[Dict[str, str]] = None, + where_any: Optional[Dict[str, str]] = None, + casefold: bool = False, + ind1: Optional[str] = None, + ind2: Optional[str] = None, +) -> List[DataField]: + """ + Return datafields of `tag` whose subfields match constraints: + - where_all: every (code -> exact value) must be present + - where_any: at least one (code -> exact value) present + Set `casefold=True` for case-insensitive comparison. + """ + where_all = where_all or {} + where_any = where_any or {} + matched: List[DataField] = [] + + for df in iter_datafields(rec, tag, ind1, ind2): + # Map code -> list of values (with optional casefold applied) + vals: Dict[str, List[str]] = {} + for sf in df.subfields: + v = sf.value.casefold() if casefold else sf.value + vals.setdefault(sf.code, []).append(v) + + ok = True + for c, v in where_all.items(): + vv = v.casefold() if casefold else v + if c not in vals or vv not in vals[c]: + ok = False + break + + if ok and where_any: + any_ok = any( + (c in vals) and ((v.casefold() if casefold else v) in vals[c]) + for c, v in where_any.items() + ) + if not any_ok: + ok = False + + if ok: + matched.append(df) + + return matched + + +def controlfield_value( + rec: MarcRecord, tag: str, default: Optional[str] = None +) -> Optional[str]: + """Get the first controlfield value by tag (e.g., '001', '005').""" + for cf in rec.controlfields: + if cf.tag == tag: + return cf.value + return default + + +def datafields_value( + data: List[DataField], code: str, default: Optional[str] = None +) -> Optional[str]: + """Get the first value for a specific subfield code in a list of datafields.""" + for df in data: + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def datafield_value( + df: DataField, code: str, default: Optional[str] = None +) -> Optional[str]: + """Get the first value for a specific subfield code in a datafield.""" + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def _smart_join_title(a: str, b: Optional[str]) -> str: + """ + Join 245 $a and $b with MARC-style punctuation. + If $b is present, join with ' : ' unless either side already supplies punctuation. + """ + a = a.strip() + if not b: + return a + b = b.strip() + if a.endswith((":", ";", "/")) or b.startswith((":", ";", "/")): + return f"{a} {b}" + return f"{a} : {b}" + + +def subfield_values_from_fields( + fields: Iterable[DataField], + code: str, +) -> List[str]: + """All subfield values with given `code` across a list of DataField.""" + return [sf.value for df in fields for sf in df.subfields if sf.code == code] + + +def first_subfield_value_from_fields( + fields: Iterable[DataField], + code: str, + default: Optional[str] = None, +) -> Optional[str]: + """First subfield value with given `code` across a list of DataField.""" + for df in fields: + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def subfield_value_pairs_from_fields( + fields: Iterable[DataField], + code: str, +) -> List[Tuple[DataField, str]]: + """ + Return (DataField, value) pairs for all subfields with `code`. + Useful if you need to know which field a value came from. + """ + out: List[Tuple[DataField, str]] = [] + for df in fields: + for sf in df.subfields: + if sf.code == code: + out.append((df, sf.value)) + return out + + +def book_from_marc(rec: MarcRecord) -> BookData: + # PPN from controlfield 001 + ppn = controlfield_value(rec, "001") + + # Title = 245 $a + 245 $b (if present) + t_a = first_subfield_value(rec, "245", "a") + t_b = first_subfield_value(rec, "245", "b") + title = _smart_join_title(t_a, t_b) if t_a else None + + # Signature = 924 where $9 == "Frei 129" → take that field's $g + frei_fields = find_datafields_with_subfields( + rec, "924", where_all={"9": "Frei 129"} + ) + signature = first_subfield_value_from_fields(frei_fields, "g") + + # Year = 264 $c (prefer ind2="1" publication; fallback to any 264) + year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value( + rec, "264", "c" + ) + isbn = subfield_values(rec, "020", "a") + mediatype = first_subfield_value(rec, "338", "a") + lang = subfield_values(rec, "041", "a") + authors = subfield_values(rec, "700", "a") + author = None + if authors: + author = "; ".join(authors) + + return BookData( + ppn=ppn, + title=title, + signature=signature, + edition=first_subfield_value(rec, "250", "a") or "", + year=year, + pages=first_subfield_value(rec, "300", "a") or "", + publisher=first_subfield_value(rec, "264", "b") or "", + isbn=isbn, + language=lang, + link="", + author=author, + media_type=mediatype, + ) + + +class SWBData(Enum): + URL = "https://sru.k10plus.de/opac-de-627!rec=1?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=marcxml" + ARGSCHEMA = "pica." + NAME = "SWB" + + +class DNBData(Enum): + URL = "https://services.dnb.de/sru/dnb?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=MARC21-xml" + ARGSCHEMA = "" + NAME = "DNB" + + +class SRUSite(Enum): + SWB = SWBData + DNB = DNBData + + +RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK notations + + +def find_newer_edition( + swb_result: BookData, dnb_result: List[BookData] +) -> Optional[List[BookData]]: + """ + New edition if: + - year > swb.year OR + - edition_number > swb.edition_number + + Additional guards & preferences: + - If both have signatures and they differ, skip (not the same work). + - For duplicates (same ppn): keep the one that has a signature, and + prefer a signature that matches swb_result.signature. + - If multiple remain: keep the single 'latest' by (year desc, + edition_number desc, best-signature-match desc, has-signature desc). + """ + + def norm_sig(s: Optional[str]) -> str: + if not s: + return "" + # normalize: lowercase, collapse whitespace, keep alnum + a few separators + s = s.lower() + s = re.sub(r"\s+", " ", s).strip() + # remove obvious noise; adjust if your signature format differs + s = re.sub(r"[^a-z0-9\-_/\. ]+", "", s) + return s + + def has_sig(b: BookData) -> bool: + return bool(getattr(b, "signature", None)) + + def sig_matches_swb(b: BookData) -> bool: + if not has_sig(b) or not has_sig(swb_result): + return False + return norm_sig(b.signature) == norm_sig(swb_result.signature) + + def strictly_newer(b: BookData) -> bool: + by_year = ( + b.year is not None + and swb_result.year is not None + and b.year > swb_result.year + ) + by_edition = ( + b.edition_number is not None + and swb_result.edition_number is not None + and b.edition_number > swb_result.edition_number + ) + return by_year or by_edition + + swb_sig_norm = norm_sig(getattr(swb_result, "signature", None)) + + # 1) Filter to same-work AND newer + candidates: List[BookData] = [] + for b in dnb_result: + # Skip if both signatures exist and don't match (different work) + b_sig = getattr(b, "signature", None) + if b_sig and swb_result.signature: + if norm_sig(b_sig) != swb_sig_norm: + continue # not the same work + + # Keep only if newer by rules + if strictly_newer(b): + candidates.append(b) + + if not candidates: + return None + + # 2) Dedupe by PPN, preferring signature (and matching signature if possible) + by_ppn: dict[Optional[str], BookData] = {} + for b in candidates: + key = getattr(b, "ppn", None) + prev = by_ppn.get(key) + if prev is None: + by_ppn[key] = b + continue + + # Compute preference score for both + def ppn_pref_score(x: BookData) -> tuple[int, int]: + # (signature matches swb, has signature) + return (1 if sig_matches_swb(x) else 0, 1 if has_sig(x) else 0) + + if ppn_pref_score(b) > ppn_pref_score(prev): + by_ppn[key] = b + + deduped = list(by_ppn.values()) + if not deduped: + return None + + # 3) If multiple remain, keep only the latest one. + # Order: year desc, edition_number desc, signature-match desc, has-signature desc + def sort_key(b: BookData): + year = b.year if b.year is not None else -1 + ed = b.edition_number if b.edition_number is not None else -1 + sig_match = 1 if sig_matches_swb(b) else 0 + sig_present = 1 if has_sig(b) else 0 + return (year, ed, sig_match, sig_present) + + best = max(deduped, key=sort_key) + return [best] if best else None + + +class Api: + def __init__(self, site: str, url: str, prefix: str): + self.site = site + self.url = url + self.prefix = prefix + # Reuse TCP connections across requests for better performance + self._session = requests.Session() + # Slightly larger connection pool for concurrent calls + adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20) + self._session.mount("http://", adapter) + self._session.mount("https://", adapter) + + def close(self): + try: + self._session.close() + except Exception: + pass + + def __del__(self): + # Best-effort cleanup + self.close() + + def get(self, query_args: Iterable[str]) -> List[Record]: + # if any query_arg ends with =, remove it + if self.site == "DNB": + args = [arg for arg in query_args if not arg.startswith("pica.")] + if args == []: + raise ValueError("DNB queries must include at least one search term") + query_args = args + # query_args = [f"{self.prefix}{arg}" for arg in query_args] + query = "+and+".join(query_args) + query = query.replace(" ", "%20").replace("&", "%26") + # query_args = [arg for arg in query_args if not arg.endswith("=")] + # query = "+and+".join(query_args) + # query = query.replace(" ", "%20").replace("&", "%26") + # insert the query into the url url is + url = self.url.format(query) + + log.debug(url) + headers = { + "User-Agent": f"{self.site} SRU Client, ", + "Accept": "application/xml", + "Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3", + } + # Use persistent session and set timeouts to avoid hanging + resp = self._session.get(url, headers=headers, timeout=(3.05, 60)) + if resp.status_code != 200: + raise Exception(f"Error fetching data from SWB: {resp.status_code}") + # Parse using raw bytes (original behavior) to preserve encoding edge cases + sr = parse_search_retrieve_response(resp.content) + return sr.records + + def getBooks(self, query_args: Iterable[str]) -> List[BookData]: + records: List[Record] = self.get(query_args) + # Avoid printing on hot paths; rely on logger if needed + log.debug(f"{self.site} found {len(records)} records for args={query_args}") + books: List[BookData] = [] + # extract title from query_args if present + title = None + for arg in query_args: + if arg.startswith("pica.tit="): + title = arg.split("=")[1] + break + for rec in records: + book = book_from_marc(rec.recordData) + books.append(book) + if title: + books = [ + b + for b in books + if b.title and b.title.lower().startswith(title.lower()) + ] + return books + + def getLinkForBook(self, book: BookData) -> str: + # Not implemented: depends on catalog front-end; return empty string for now + return "" + + +class SWB(Api): + def __init__(self): + self.site = SWBData.NAME.value + self.url = SWBData.URL.value + self.prefix = SWBData.ARGSCHEMA.value + super().__init__(self.site, self.url, self.prefix) diff --git a/uv.lock b/uv.lock new file mode 100644 index 0000000..801ecc4 --- /dev/null +++ b/uv.lock @@ -0,0 +1,7 @@ +version = 1 +requires-python = ">=3.13" + +[[package]] +name = "bibapi" +version = "0.1.0" +source = { editable = "." }