From 5bf5eeae00e74d12462cb511cb613bb8b0000e84 Mon Sep 17 00:00:00 2001 From: WorldTeacher Date: Mon, 1 Sep 2025 14:30:37 +0200 Subject: [PATCH] add APIs to parse data from SWB and Lehmanns --- src/logic/lehmannsapi.py | 280 ++++++++++++++++++++++++ src/logic/swb.py | 448 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 728 insertions(+) create mode 100644 src/logic/lehmannsapi.py create mode 100644 src/logic/swb.py diff --git a/src/logic/lehmannsapi.py b/src/logic/lehmannsapi.py new file mode 100644 index 0000000..6e5a4b2 --- /dev/null +++ b/src/logic/lehmannsapi.py @@ -0,0 +1,280 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass, asdict, field +from typing import Optional, List, Iterable +from urllib.parse import urljoin, quote_plus + +import httpx +from bs4 import BeautifulSoup + +BASE = "https://www.lehmanns.de" +SEARCH_URL = "https://www.lehmanns.de/search/quick?mediatype_id=&q=" + + +@dataclass +class LehmannsSearchResult: + title: str + url: str + + # Core fields from the listing card + year: Optional[int] = None + edition: Optional[int] = None + publisher: Optional[str] = None + isbn13: Optional[str] = None + + # Extras from the listing card + description: Optional[str] = None + authors: list[str] = field(default_factory=list) + media_type: Optional[str] = None + book_format: Optional[str] = None + price_eur: Optional[float] = None + currency: str = "EUR" + image: Optional[str] = None + + # From detail page: + pages: Optional[str] = None # " Seiten" + buyable: bool = True # set in enrich_pages (detail page) + unavailable_hint: Optional[str] = None # e.g. "Titel ist leider vergriffen; keine Neuauflage" + + def to_dict(self) -> dict: + return asdict(self) + + +class LehmannsClient: + """Scrapes quick-search results, then enriches (and filters) via product pages.""" + + def __init__(self, timeout: float = 20.0): + self.client = httpx.Client( + headers={ + "User-Agent": ( + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/124.0 Safari/537.36" + ), + "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + }, + timeout=timeout, + follow_redirects=True, + ) + + def close(self): + self.client.close() + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + + # ------------------- Search (listing) ------------------- + + def build_search_url(self, title: str) -> str: + # spaces -> '+' + return SEARCH_URL + quote_plus(title) + + def search_by_title(self, title: str, limit: Optional[int] = None, strict: bool = False) -> List[LehmannsSearchResult]: + """ + Parse the listing page only (no availability check here). + Use enrich_pages(...) afterwards to fetch detail pages, add 'pages', + and drop unbuyable items. + """ + url = self.build_search_url(title) + html = self._get(url) + if not html: + return [] + results = self._parse_results(html) + self.enrich_pages(results) + if strict: + # filter results to only those with exact title match (case-insensitive) + title_lower = title.lower() + results = [r for r in results if r.title and r.title.lower() == title_lower] + results = [r for r in results if r.buyable] + return results + if limit is not None: + results = results[:max(0, limit)] + return results + + # ------------------- Detail enrichment & filtering ------------------- + + def enrich_pages(self, results: Iterable[LehmannsSearchResult], drop_unbuyable: bool = True) -> List[LehmannsSearchResult]: + """ + Fetch each result.url, extract: + - pages: from ... + - availability: from
  • ...
  • + * if it contains "Titel ist leider vergriffen", mark buyable=False + * if it also contains "keine Neuauflage", set unavailable_hint accordingly + If drop_unbuyable=True, exclude non-buyable results from the returned list. + """ + enriched: List[LehmannsSearchResult] = [] + for r in results: + try: + html = self._get(r.url) + if not html: + # Can't verify; keep as-is when not dropping, else skip + if not drop_unbuyable: + enriched.append(r) + continue + + soup = BeautifulSoup(html, "html.parser") + + # Pages + pages_node = soup.select_one( + "span.book-meta.meta-seiten[itemprop='numberOfPages'], " + "span.book-meta.meta-seiten[itemprop='numberofpages'], " + ".meta-seiten [itemprop='numberOfPages'], " + ".meta-seiten[itemprop='numberOfPages'], " + ".book-meta.meta-seiten" + ) + if pages_node: + text = pages_node.get_text(" ", strip=True) + m = re.search(r"\d+", text) + if m: + r.pages = f"{m.group(0)} Seiten" + + # Availability via li.availability-3 + avail_li = soup.select_one("li.availability-3") + if avail_li: + avail_text = " ".join(avail_li.get_text(" ", strip=True).split()).lower() + if "titel ist leider vergriffen" in avail_text: + r.buyable = False + if "keine neuauflage" in avail_text: + r.unavailable_hint = "Titel ist leider vergriffen; keine Neuauflage" + else: + r.unavailable_hint = "Titel ist leider vergriffen" + + # Append or drop + if (not drop_unbuyable) or r.buyable: + enriched.append(r) + + except Exception: + # On any per-item error, keep the record if not dropping; else skip + if not drop_unbuyable: + enriched.append(r) + continue + + return enriched + + # ------------------- Internals ------------------- + + def _get(self, url: str) -> Optional[str]: + try: + r = self.client.get(url) + r.encoding = "utf-8" + if r.status_code == 200 and "text/html" in (r.headers.get("content-type") or ""): + return r.text + except httpx.HTTPError: + pass + return None + + def _parse_results(self, html: str) -> List[LehmannsSearchResult]: + soup = BeautifulSoup(html, "html.parser") + results: list[LehmannsSearchResult] = [] + + for block in soup.select("div.info-block"): + a = block.select_one(".title a[href]") + if not a: + continue + url = urljoin(BASE, a["href"].strip()) + base_title = (block.select_one(".title [itemprop='name']") or a).get_text(strip=True) + + # Alternative headline => extend title + alt_tag = block.select_one(".description[itemprop='alternativeHeadline']") + alternative_headline = alt_tag.get_text(strip=True) if alt_tag else None + title = f"{base_title} : {alternative_headline}" if alternative_headline else base_title + description = alternative_headline + + # Authors from .author + authors: list[str] = [] + author_div = block.select_one("div.author") + if author_div: + t = author_div.get_text(" ", strip=True) + t = re.sub(r"^\s*von\s+", "", t, flags=re.I) + for part in re.split(r"\s*;\s*|\s*&\s*|\s+und\s+", t): + name = " ".join(part.split()) + if name: + authors.append(name) + + # Media + format + media_type = None + book_format = None + type_text = block.select_one(".type") + if type_text: + t = type_text.get_text(" ", strip=True) + m = re.search(r"\b(Buch|eBook|Hörbuch)\b", t) + if m: + media_type = m.group(1) + fm = re.search(r"\(([^)]+)\)", t) + if fm: + book_format = fm.group(1).strip().upper() + + # Year + year = None + y = block.select_one("[itemprop='copyrightYear']") + if y: + try: + year = int(y.get_text(strip=True)) + except ValueError: + pass + + # Edition + edition = None + ed = block.select_one("[itemprop='bookEdition']") + if ed: + m = re.search(r"\d+", ed.get_text(strip=True)) + if m: + edition = int(m.group()) + + # Publisher + publisher = None + pub = block.select_one(".publisherprop [itemprop='name']") or block.select_one(".publisher [itemprop='name']") + if pub: + publisher = pub.get_text(strip=True) + + # ISBN-13 + isbn13 = None + isbn_tag = block.select_one(".isbn [itemprop='isbn'], [itemprop='isbn']") + if isbn_tag: + digits = re.sub(r"[^0-9Xx]", "", isbn_tag.get_text(strip=True)) + m = re.search(r"(97[89]\d{10})", digits) + if m: + isbn13 = m.group(1) + + # Price (best effort) + price_eur = None + txt = block.get_text(" ", strip=True) + mprice = re.search(r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€", txt) + if not mprice and block.parent: + sib = block.parent.get_text(" ", strip=True) + mprice = re.search(r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€", sib) + if mprice: + num = mprice.group(1).replace(".", "").replace(",", ".") + try: + price_eur = float(num) + except ValueError: + pass + + # Image (best-effort) + image = None + left_img = block.find_previous("img") + if left_img and left_img.get("src"): + image = urljoin(BASE, left_img["src"]) + + results.append( + LehmannsSearchResult( + title=title, + url=url, + description=description, + authors=authors, + media_type=media_type, + book_format=book_format, + year=year, + edition=edition, + publisher=publisher, + isbn13=isbn13, + price_eur=price_eur, + image=image, + ) + ) + + return results diff --git a/src/logic/swb.py b/src/logic/swb.py new file mode 100644 index 0000000..826c101 --- /dev/null +++ b/src/logic/swb.py @@ -0,0 +1,448 @@ +import xml.etree.ElementTree as ET +from dataclasses import dataclass, field +from typing import Dict, Iterable, List, Optional, Tuple + +import requests + +from src.logic.dataclass import BookData + +# ----------------------- +# Dataclasses +# ----------------------- + + +# --- MARC XML structures --- +@dataclass +class ControlField: + tag: str + value: str + + +@dataclass +class SubField: + code: str + value: str + + +@dataclass +class DataField: + tag: str + ind1: str = " " + ind2: str = " " + subfields: List[SubField] = field(default_factory=list) + + +@dataclass +class MarcRecord: + leader: str + controlfields: List[ControlField] = field(default_factory=list) + datafields: List[DataField] = field(default_factory=list) + + +# --- SRU record wrapper --- +@dataclass +class Record: + recordSchema: str + recordPacking: str + recordData: MarcRecord + recordPosition: int + + +@dataclass +class EchoedSearchRequest: + version: str + query: str + maximumRecords: int + recordPacking: str + recordSchema: str + + +@dataclass +class SearchRetrieveResponse: + version: str + numberOfRecords: int + records: List[Record] = field(default_factory=list) + echoedSearchRetrieveRequest: Optional[EchoedSearchRequest] = None + + +# ----------------------- +# Parser +# ----------------------- + +ZS = "http://www.loc.gov/zing/srw/" +MARC = "http://www.loc.gov/MARC21/slim" +NS = {"zs": ZS, "marc": MARC} + + +def _text(elem: Optional[ET.Element]) -> str: + return (elem.text or "") if elem is not None else "" + + +def _req_text(parent: ET.Element, path: str) -> str: + el = parent.find(path, NS) + if el is None or el.text is None: + raise ValueError(f"Required element not found or empty: {path}") + return el.text + + +def parse_marc_record(record_el: ET.Element) -> MarcRecord: + """ + record_el is the element (default ns MARC in your sample) + """ + # leader + leader_text = _req_text(record_el, "marc:leader") + + # controlfields + controlfields: List[ControlField] = [] + for cf in record_el.findall("marc:controlfield", NS): + tag = cf.get("tag", "").strip() + controlfields.append(ControlField(tag=tag, value=_text(cf))) + + # datafields + datafields: List[DataField] = [] + for df in record_el.findall("marc:datafield", NS): + tag = df.get("tag", "").strip() + ind1 = df.get("ind1") or " " + ind2 = df.get("ind2") or " " + subfields: List[SubField] = [] + for sf in df.findall("marc:subfield", NS): + code = sf.get("code", "") + subfields.append(SubField(code=code, value=_text(sf))) + datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields)) + + return MarcRecord( + leader=leader_text, controlfields=controlfields, datafields=datafields + ) + + +def parse_record(zs_record_el: ET.Element) -> Record: + recordSchema = _req_text(zs_record_el, "zs:recordSchema") + recordPacking = _req_text(zs_record_el, "zs:recordPacking") + + # recordData contains a MARC with default MARC namespace in your sample + recordData_el = zs_record_el.find("zs:recordData", NS) + if recordData_el is None: + raise ValueError("Missing zs:recordData") + + marc_record_el = recordData_el.find("marc:record", NS) + if marc_record_el is None: + # If the MARC record uses default ns (xmlns="...") ElementTree still needs the ns-qualified name + # We already searched with prefix; this covers both default and prefixed cases. + raise ValueError("Missing MARC21 record inside zs:recordData") + + marc_record = parse_marc_record(marc_record_el) + + recordPosition = int(_req_text(zs_record_el, "zs:recordPosition")) + return Record( + recordSchema=recordSchema, + recordPacking=recordPacking, + recordData=marc_record, + recordPosition=recordPosition, + ) + + +def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]: + el = root.find("zs:echoedSearchRetrieveRequest", NS) + if el is None: + return None + + # Be permissive with missing fields + version = _text(el.find("zs:version", NS)) + query = _text(el.find("zs:query", NS)) + maximumRecords_text = _text(el.find("zs:maximumRecords", NS)) or "0" + recordPacking = _text(el.find("zs:recordPacking", NS)) + recordSchema = _text(el.find("zs:recordSchema", NS)) + + try: + maximumRecords = int(maximumRecords_text) + except ValueError: + maximumRecords = 0 + + return EchoedSearchRequest( + version=version, + query=query, + maximumRecords=maximumRecords, + recordPacking=recordPacking, + recordSchema=recordSchema, + ) + + +def parse_search_retrieve_response(xml_str: str) -> SearchRetrieveResponse: + root = ET.fromstring(xml_str) + + # Root is zs:searchRetrieveResponse + version = _req_text(root, "zs:version") + numberOfRecords = int(_req_text(root, "zs:numberOfRecords")) + + records_parent = root.find("zs:records", NS) + records: List[Record] = [] + if records_parent is not None: + for r in records_parent.findall("zs:record", NS): + records.append(parse_record(r)) + + echoed = parse_echoed_request(root) + + return SearchRetrieveResponse( + version=version, + numberOfRecords=numberOfRecords, + records=records, + echoedSearchRetrieveRequest=echoed, + ) + + +# --- Query helpers over MarcRecord --- + + +def iter_datafields( + rec: MarcRecord, + tag: Optional[str] = None, + ind1: Optional[str] = None, + ind2: Optional[str] = None, +) -> Iterable[DataField]: + """Yield datafields, optionally filtered by tag/indicators.""" + for df in rec.datafields: + if tag is not None and df.tag != tag: + continue + if ind1 is not None and df.ind1 != ind1: + continue + if ind2 is not None and df.ind2 != ind2: + continue + yield df + + +def subfield_values( + rec: MarcRecord, + tag: str, + code: str, + *, + ind1: Optional[str] = None, + ind2: Optional[str] = None, +) -> List[str]: + """All values for subfield `code` in every `tag` field (respecting indicators).""" + out: List[str] = [] + for df in iter_datafields(rec, tag, ind1, ind2): + out.extend(sf.value for sf in df.subfields if sf.code == code) + return out + + +def first_subfield_value( + rec: MarcRecord, + tag: str, + code: str, + *, + ind1: Optional[str] = None, + ind2: Optional[str] = None, + default: Optional[str] = None, +) -> Optional[str]: + """First value for subfield `code` in `tag` (respecting indicators).""" + for df in iter_datafields(rec, tag, ind1, ind2): + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def find_datafields_with_subfields( + rec: MarcRecord, + tag: str, + *, + where_all: Optional[Dict[str, str]] = None, + where_any: Optional[Dict[str, str]] = None, + casefold: bool = False, + ind1: Optional[str] = None, + ind2: Optional[str] = None, +) -> List[DataField]: + """ + Return datafields of `tag` whose subfields match constraints: + - where_all: every (code -> exact value) must be present + - where_any: at least one (code -> exact value) present + Set `casefold=True` for case-insensitive comparison. + """ + where_all = where_all or {} + where_any = where_any or {} + matched: List[DataField] = [] + + for df in iter_datafields(rec, tag, ind1, ind2): + # Map code -> list of values (with optional casefold applied) + vals: Dict[str, List[str]] = {} + for sf in df.subfields: + v = sf.value.casefold() if casefold else sf.value + vals.setdefault(sf.code, []).append(v) + + ok = True + for c, v in where_all.items(): + vv = v.casefold() if casefold else v + if c not in vals or vv not in vals[c]: + ok = False + break + + if ok and where_any: + any_ok = any( + (c in vals) and ((v.casefold() if casefold else v) in vals[c]) + for c, v in where_any.items() + ) + if not any_ok: + ok = False + + if ok: + matched.append(df) + + return matched + + +def controlfield_value( + rec: MarcRecord, tag: str, default: Optional[str] = None +) -> Optional[str]: + """Get the first controlfield value by tag (e.g., '001', '005').""" + for cf in rec.controlfields: + if cf.tag == tag: + return cf.value + return default + + +def datafields_value( + data: List[DataField], code: str, default: Optional[str] = None +) -> Optional[str]: + """Get the first value for a specific subfield code in a list of datafields.""" + for df in data: + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def datafield_value( + df: DataField, code: str, default: Optional[str] = None +) -> Optional[str]: + """Get the first value for a specific subfield code in a datafield.""" + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def _smart_join_title(a: str, b: Optional[str]) -> str: + """ + Join 245 $a and $b with MARC-style punctuation. + If $b is present, join with ' : ' unless either side already supplies punctuation. + """ + a = a.strip() + if not b: + return a + b = b.strip() + if a.endswith((":", ";", "/")) or b.startswith((":", ";", "/")): + return f"{a} {b}" + return f"{a} : {b}" + + +def subfield_values_from_fields( + fields: Iterable[DataField], + code: str, +) -> List[str]: + """All subfield values with given `code` across a list of DataField.""" + return [sf.value for df in fields for sf in df.subfields if sf.code == code] + + +def first_subfield_value_from_fields( + fields: Iterable[DataField], + code: str, + default: Optional[str] = None, +) -> Optional[str]: + """First subfield value with given `code` across a list of DataField.""" + for df in fields: + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def subfield_value_pairs_from_fields( + fields: Iterable[DataField], + code: str, +) -> List[Tuple[DataField, str]]: + """ + Return (DataField, value) pairs for all subfields with `code`. + Useful if you need to know which field a value came from. + """ + out: List[Tuple[DataField, str]] = [] + for df in fields: + for sf in df.subfields: + if sf.code == code: + out.append((df, sf.value)) + return out + + +def book_from_marc(rec: MarcRecord) -> BookData: + # PPN from controlfield 001 + ppn = controlfield_value(rec, "001") + + # Title = 245 $a + 245 $b (if present) + t_a = first_subfield_value(rec, "245", "a") + t_b = first_subfield_value(rec, "245", "b") + title = _smart_join_title(t_a, t_b) if t_a else None + + # Signature = 924 where $9 == "Frei 129" → take that field's $g + frei_fields = find_datafields_with_subfields( + rec, "924", where_all={"9": "Frei 129"} + ) + signature = first_subfield_value_from_fields(frei_fields, "g") + + # Year = 264 $c (prefer ind2="1" publication; fallback to any 264) + year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value( + rec, "264", "c" + ) + isbn = subfield_values(rec, "020", "a") + + return BookData( + ppn=ppn, + title=title, + signature=signature, + edition=first_subfield_value(rec, "250", "a"), + year=year, + pages=first_subfield_value(rec, "300", "a"), + publisher=first_subfield_value(rec, "264", "b"), + isbn=isbn, + ) + + +class SWB: + def __init__(self): + self.url = "https://sru.k10plus.de/opac-de-627!rec=1?version=1.1&operation=searchRetrieve&query={}&maximumRecords=10&recordSchema=marcxml" + self.bib_id = 20735 + + def get(self, query_args: Iterable[str]) -> List[Record]: + # if any query_arg ends with =, remove it + query_args = [arg for arg in query_args if not arg.endswith("=")] + query = "+and+".join(query_args) + query = query.replace(" ", "%20").replace("&", "%26") + + url = self.url.format(query) + + print("Fetching from SWB:", url) + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", + "Accept": "application/xml", + "Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3", + } + response = requests.get(url, headers=headers) + if response.status_code != 200: + raise Exception(f"Error fetching data from SWB: {response.status_code}") + # print(response.text) + data = response.content + + # extract top-level response + response = parse_search_retrieve_response(data) + return response.records + + def getBooks(self, query_args: Iterable[str]) -> List[BookData]: + records: List[Record] = self.get(query_args) + books: List[BookData] = [] + title = query_args[1].split("=")[1] + # print(len(records), "records found") + for rec in records: + book = book_from_marc(rec.recordData) + books.append(book) + books = [ + b for b in books if b.title and b.title.lower().startswith(title.lower()) + ] + return books