import re import xml.etree.ElementTree as ET from dataclasses import dataclass, field from enum import Enum from typing import Dict, Iterable, List, Optional, Tuple, Union import requests from requests.adapters import HTTPAdapter # centralized logging used via src.shared.logging from src.logic.dataclass import BookData from src.shared.logging import log log # ensure imported logger is referenced # ----------------------- # Dataclasses # ----------------------- # --- MARC XML structures --- @dataclass class ControlField: tag: str value: str @dataclass class SubField: code: str value: str @dataclass class DataField: tag: str ind1: str = " " ind2: str = " " subfields: List[SubField] = field(default_factory=list) @dataclass class MarcRecord: leader: str controlfields: List[ControlField] = field(default_factory=list) datafields: List[DataField] = field(default_factory=list) # --- SRU record wrapper --- @dataclass class Record: recordSchema: str recordPacking: str recordData: MarcRecord recordPosition: int @dataclass class EchoedSearchRequest: version: str query: str maximumRecords: int recordPacking: str recordSchema: str @dataclass class SearchRetrieveResponse: version: str numberOfRecords: int records: List[Record] = field(default_factory=list) echoedSearchRetrieveRequest: Optional[EchoedSearchRequest] = None # ----------------------- # Parser # ----------------------- ZS = "http://www.loc.gov/zing/srw/" MARC = "http://www.loc.gov/MARC21/slim" NS = {"zs": ZS, "marc": MARC} def _text(elem: Optional[ET.Element]) -> str: return (elem.text or "") if elem is not None else "" def _req_text(parent: ET.Element, path: str) -> Optional[str]: el = parent.find(path, NS) if el is None or el.text is None: return None return el.text def parse_marc_record(record_el: ET.Element) -> MarcRecord: """ record_el is the element (default ns MARC in your sample) """ # leader leader_text = _req_text(record_el, "marc:leader") or "" # controlfields controlfields: List[ControlField] = [] for cf in record_el.findall("marc:controlfield", NS): tag = cf.get("tag", "").strip() controlfields.append(ControlField(tag=tag, value=_text(cf))) # datafields datafields: List[DataField] = [] for df in record_el.findall("marc:datafield", NS): tag = df.get("tag", "").strip() ind1 = df.get("ind1") or " " ind2 = df.get("ind2") or " " subfields: List[SubField] = [] for sf in df.findall("marc:subfield", NS): code = sf.get("code", "") subfields.append(SubField(code=code, value=_text(sf))) datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields)) return MarcRecord( leader=leader_text, controlfields=controlfields, datafields=datafields ) def parse_record(zs_record_el: ET.Element) -> Record: recordSchema = _req_text(zs_record_el, "zs:recordSchema") or "" recordPacking = _req_text(zs_record_el, "zs:recordPacking") or "" # recordData contains a MARC with default MARC namespace in your sample recordData_el = zs_record_el.find("zs:recordData", NS) if recordData_el is None: raise ValueError("Missing zs:recordData") marc_record_el = recordData_el.find("marc:record", NS) if marc_record_el is None: # If the MARC record uses default ns (xmlns="...") ElementTree still needs the ns-qualified name # We already searched with prefix; this covers both default and prefixed cases. raise ValueError("Missing MARC21 record inside zs:recordData") marc_record = parse_marc_record(marc_record_el) recordPosition = int(_req_text(zs_record_el, "zs:recordPosition") or "0") return Record( recordSchema=recordSchema, recordPacking=recordPacking, recordData=marc_record, recordPosition=recordPosition, ) def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]: el = root.find("zs:echoedSearchRetrieveRequest", NS) if el is None: return None # Be permissive with missing fields version = _text(el.find("zs:version", NS)) query = _text(el.find("zs:query", NS)) maximumRecords_text = _text(el.find("zs:maximumRecords", NS)) or "0" recordPacking = _text(el.find("zs:recordPacking", NS)) recordSchema = _text(el.find("zs:recordSchema", NS)) try: maximumRecords = int(maximumRecords_text) except ValueError: maximumRecords = 0 return EchoedSearchRequest( version=version, query=query, maximumRecords=maximumRecords, recordPacking=recordPacking, recordSchema=recordSchema, ) def parse_search_retrieve_response( xml_str: Union[str, bytes], ) -> SearchRetrieveResponse: root = ET.fromstring(xml_str) # Root is zs:searchRetrieveResponse version = _req_text(root, "zs:version") numberOfRecords = int(_req_text(root, "zs:numberOfRecords") or "0") records_parent = root.find("zs:records", NS) records: List[Record] = [] if records_parent is not None: for r in records_parent.findall("zs:record", NS): records.append(parse_record(r)) echoed = parse_echoed_request(root) return SearchRetrieveResponse( version=version, numberOfRecords=numberOfRecords, records=records, echoedSearchRetrieveRequest=echoed, ) # --- Query helpers over MarcRecord --- def iter_datafields( rec: MarcRecord, tag: Optional[str] = None, ind1: Optional[str] = None, ind2: Optional[str] = None, ) -> Iterable[DataField]: """Yield datafields, optionally filtered by tag/indicators.""" for df in rec.datafields: if tag is not None and df.tag != tag: continue if ind1 is not None and df.ind1 != ind1: continue if ind2 is not None and df.ind2 != ind2: continue yield df def subfield_values( rec: MarcRecord, tag: str, code: str, *, ind1: Optional[str] = None, ind2: Optional[str] = None, ) -> List[str]: """All values for subfield `code` in every `tag` field (respecting indicators).""" out: List[str] = [] for df in iter_datafields(rec, tag, ind1, ind2): out.extend(sf.value for sf in df.subfields if sf.code == code) return out def first_subfield_value( rec: MarcRecord, tag: str, code: str, *, ind1: Optional[str] = None, ind2: Optional[str] = None, default: Optional[str] = None, ) -> Optional[str]: """First value for subfield `code` in `tag` (respecting indicators).""" for df in iter_datafields(rec, tag, ind1, ind2): for sf in df.subfields: if sf.code == code: return sf.value return default def find_datafields_with_subfields( rec: MarcRecord, tag: str, *, where_all: Optional[Dict[str, str]] = None, where_any: Optional[Dict[str, str]] = None, casefold: bool = False, ind1: Optional[str] = None, ind2: Optional[str] = None, ) -> List[DataField]: """ Return datafields of `tag` whose subfields match constraints: - where_all: every (code -> exact value) must be present - where_any: at least one (code -> exact value) present Set `casefold=True` for case-insensitive comparison. """ where_all = where_all or {} where_any = where_any or {} matched: List[DataField] = [] for df in iter_datafields(rec, tag, ind1, ind2): # Map code -> list of values (with optional casefold applied) vals: Dict[str, List[str]] = {} for sf in df.subfields: v = sf.value.casefold() if casefold else sf.value vals.setdefault(sf.code, []).append(v) ok = True for c, v in where_all.items(): vv = v.casefold() if casefold else v if c not in vals or vv not in vals[c]: ok = False break if ok and where_any: any_ok = any( (c in vals) and ((v.casefold() if casefold else v) in vals[c]) for c, v in where_any.items() ) if not any_ok: ok = False if ok: matched.append(df) return matched def controlfield_value( rec: MarcRecord, tag: str, default: Optional[str] = None ) -> Optional[str]: """Get the first controlfield value by tag (e.g., '001', '005').""" for cf in rec.controlfields: if cf.tag == tag: return cf.value return default def datafields_value( data: List[DataField], code: str, default: Optional[str] = None ) -> Optional[str]: """Get the first value for a specific subfield code in a list of datafields.""" for df in data: for sf in df.subfields: if sf.code == code: return sf.value return default def datafield_value( df: DataField, code: str, default: Optional[str] = None ) -> Optional[str]: """Get the first value for a specific subfield code in a datafield.""" for sf in df.subfields: if sf.code == code: return sf.value return default def _smart_join_title(a: str, b: Optional[str]) -> str: """ Join 245 $a and $b with MARC-style punctuation. If $b is present, join with ' : ' unless either side already supplies punctuation. """ a = a.strip() if not b: return a b = b.strip() if a.endswith((":", ";", "/")) or b.startswith((":", ";", "/")): return f"{a} {b}" return f"{a} : {b}" def subfield_values_from_fields( fields: Iterable[DataField], code: str, ) -> List[str]: """All subfield values with given `code` across a list of DataField.""" return [sf.value for df in fields for sf in df.subfields if sf.code == code] def first_subfield_value_from_fields( fields: Iterable[DataField], code: str, default: Optional[str] = None, ) -> Optional[str]: """First subfield value with given `code` across a list of DataField.""" for df in fields: for sf in df.subfields: if sf.code == code: return sf.value return default def subfield_value_pairs_from_fields( fields: Iterable[DataField], code: str, ) -> List[Tuple[DataField, str]]: """ Return (DataField, value) pairs for all subfields with `code`. Useful if you need to know which field a value came from. """ out: List[Tuple[DataField, str]] = [] for df in fields: for sf in df.subfields: if sf.code == code: out.append((df, sf.value)) return out def book_from_marc(rec: MarcRecord) -> BookData: # PPN from controlfield 001 ppn = controlfield_value(rec, "001") # Title = 245 $a + 245 $b (if present) t_a = first_subfield_value(rec, "245", "a") t_b = first_subfield_value(rec, "245", "b") title = _smart_join_title(t_a, t_b) if t_a else None # Signature = 924 where $9 == "Frei 129" → take that field's $g frei_fields = find_datafields_with_subfields( rec, "924", where_all={"9": "Frei 129"} ) signature = first_subfield_value_from_fields(frei_fields, "g") # Year = 264 $c (prefer ind2="1" publication; fallback to any 264) year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value( rec, "264", "c" ) isbn = subfield_values(rec, "020", "a") mediatype = first_subfield_value(rec, "338", "a") lang = subfield_values(rec, "041", "a") authors = subfield_values(rec, "700", "a") author = None if authors: author = "; ".join(authors) return BookData( ppn=ppn, title=title, signature=signature, edition=first_subfield_value(rec, "250", "a") or "", year=year, pages=first_subfield_value(rec, "300", "a") or "", publisher=first_subfield_value(rec, "264", "b") or "", isbn=isbn, language=lang, link="", author=author, media_type=mediatype, ) class SWBData(Enum): URL = "https://sru.k10plus.de/opac-de-627!rec=1?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=marcxml" ARGSCHEMA = "pica." NAME = "SWB" class DNBData(Enum): URL = "https://services.dnb.de/sru/dnb?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=MARC21-xml" ARGSCHEMA = "" NAME = "DNB" class SRUSite(Enum): SWB = SWBData DNB = DNBData RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK notations def find_newer_edition( swb_result: BookData, dnb_result: List[BookData] ) -> Optional[List[BookData]]: """ New edition if: - year > swb.year OR - edition_number > swb.edition_number Additional guards & preferences: - If both have signatures and they differ, skip (not the same work). - For duplicates (same ppn): keep the one that has a signature, and prefer a signature that matches swb_result.signature. - If multiple remain: keep the single 'latest' by (year desc, edition_number desc, best-signature-match desc, has-signature desc). """ def norm_sig(s: Optional[str]) -> str: if not s: return "" # normalize: lowercase, collapse whitespace, keep alnum + a few separators s = s.lower() s = re.sub(r"\s+", " ", s).strip() # remove obvious noise; adjust if your signature format differs s = re.sub(r"[^a-z0-9\-_/\. ]+", "", s) return s def has_sig(b: BookData) -> bool: return bool(getattr(b, "signature", None)) def sig_matches_swb(b: BookData) -> bool: if not has_sig(b) or not has_sig(swb_result): return False return norm_sig(b.signature) == norm_sig(swb_result.signature) def strictly_newer(b: BookData) -> bool: by_year = ( b.year is not None and swb_result.year is not None and b.year > swb_result.year ) by_edition = ( b.edition_number is not None and swb_result.edition_number is not None and b.edition_number > swb_result.edition_number ) return by_year or by_edition swb_sig_norm = norm_sig(getattr(swb_result, "signature", None)) # 1) Filter to same-work AND newer candidates: List[BookData] = [] for b in dnb_result: # Skip if both signatures exist and don't match (different work) b_sig = getattr(b, "signature", None) if b_sig and swb_result.signature: if norm_sig(b_sig) != swb_sig_norm: continue # not the same work # Keep only if newer by rules if strictly_newer(b): candidates.append(b) if not candidates: return None # 2) Dedupe by PPN, preferring signature (and matching signature if possible) by_ppn: dict[Optional[str], BookData] = {} for b in candidates: key = getattr(b, "ppn", None) prev = by_ppn.get(key) if prev is None: by_ppn[key] = b continue # Compute preference score for both def ppn_pref_score(x: BookData) -> tuple[int, int]: # (signature matches swb, has signature) return (1 if sig_matches_swb(x) else 0, 1 if has_sig(x) else 0) if ppn_pref_score(b) > ppn_pref_score(prev): by_ppn[key] = b deduped = list(by_ppn.values()) if not deduped: return None # 3) If multiple remain, keep only the latest one. # Order: year desc, edition_number desc, signature-match desc, has-signature desc def sort_key(b: BookData): year = b.year if b.year is not None else -1 ed = b.edition_number if b.edition_number is not None else -1 sig_match = 1 if sig_matches_swb(b) else 0 sig_present = 1 if has_sig(b) else 0 return (year, ed, sig_match, sig_present) best = max(deduped, key=sort_key) return [best] if best else None class Api: def __init__(self, site: str, url: str, prefix: str): self.site = site self.url = url self.prefix = prefix # Reuse TCP connections across requests for better performance self._session = requests.Session() # Slightly larger connection pool for concurrent calls adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20) self._session.mount("http://", adapter) self._session.mount("https://", adapter) def close(self): try: self._session.close() except Exception: pass def __del__(self): # Best-effort cleanup self.close() def get(self, query_args: Iterable[str]) -> List[Record]: # if any query_arg ends with =, remove it if self.site == "DNB": args = [arg for arg in query_args if not arg.startswith("pica.")] if args == []: raise ValueError("DNB queries must include at least one search term") query_args = args # query_args = [f"{self.prefix}{arg}" for arg in query_args] query = "+and+".join(query_args) query = query.replace(" ", "%20").replace("&", "%26") # query_args = [arg for arg in query_args if not arg.endswith("=")] # query = "+and+".join(query_args) # query = query.replace(" ", "%20").replace("&", "%26") # insert the query into the url url is url = self.url.format(query) log.debug(url) headers = { "User-Agent": f"{self.site} SRU Client, ", "Accept": "application/xml", "Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3", } # Use persistent session and set timeouts to avoid hanging resp = self._session.get(url, headers=headers, timeout=(3.05, 60)) if resp.status_code != 200: raise Exception(f"Error fetching data from SWB: {resp.status_code}") # Parse using raw bytes (original behavior) to preserve encoding edge cases sr = parse_search_retrieve_response(resp.content) return sr.records def getBooks(self, query_args: Iterable[str]) -> List[BookData]: records: List[Record] = self.get(query_args) # Avoid printing on hot paths; rely on logger if needed log.debug(f"{self.site} found {len(records)} records for args={query_args}") books: List[BookData] = [] # extract title from query_args if present title = None for arg in query_args: if arg.startswith("pica.tit="): title = arg.split("=")[1] break for rec in records: book = book_from_marc(rec.recordData) books.append(book) if title: books = [ b for b in books if b.title and b.title.lower().startswith(title.lower()) ] return books def getLinkForBook(self, book: BookData) -> str: # Not implemented: depends on catalog front-end; return empty string for now return "" class SWB(Api): def __init__(self): self.site = SWBData.NAME.value self.url = SWBData.URL.value self.prefix = SWBData.ARGSCHEMA.value super().__init__(self.site, self.url, self.prefix)