add APIs to parse data from SWB and Lehmanns
This commit is contained in:
280
src/logic/lehmannsapi.py
Normal file
280
src/logic/lehmannsapi.py
Normal file
@@ -0,0 +1,280 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from typing import Optional, List, Iterable
|
||||
from urllib.parse import urljoin, quote_plus
|
||||
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
BASE = "https://www.lehmanns.de"
|
||||
SEARCH_URL = "https://www.lehmanns.de/search/quick?mediatype_id=&q="
|
||||
|
||||
|
||||
@dataclass
|
||||
class LehmannsSearchResult:
|
||||
title: str
|
||||
url: str
|
||||
|
||||
# Core fields from the listing card
|
||||
year: Optional[int] = None
|
||||
edition: Optional[int] = None
|
||||
publisher: Optional[str] = None
|
||||
isbn13: Optional[str] = None
|
||||
|
||||
# Extras from the listing card
|
||||
description: Optional[str] = None
|
||||
authors: list[str] = field(default_factory=list)
|
||||
media_type: Optional[str] = None
|
||||
book_format: Optional[str] = None
|
||||
price_eur: Optional[float] = None
|
||||
currency: str = "EUR"
|
||||
image: Optional[str] = None
|
||||
|
||||
# From detail page:
|
||||
pages: Optional[str] = None # "<N> Seiten"
|
||||
buyable: bool = True # set in enrich_pages (detail page)
|
||||
unavailable_hint: Optional[str] = None # e.g. "Titel ist leider vergriffen; keine Neuauflage"
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class LehmannsClient:
|
||||
"""Scrapes quick-search results, then enriches (and filters) via product pages."""
|
||||
|
||||
def __init__(self, timeout: float = 20.0):
|
||||
self.client = httpx.Client(
|
||||
headers={
|
||||
"User-Agent": (
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/124.0 Safari/537.36"
|
||||
),
|
||||
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
},
|
||||
timeout=timeout,
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
def close(self):
|
||||
self.client.close()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc):
|
||||
self.close()
|
||||
|
||||
# ------------------- Search (listing) -------------------
|
||||
|
||||
def build_search_url(self, title: str) -> str:
|
||||
# spaces -> '+'
|
||||
return SEARCH_URL + quote_plus(title)
|
||||
|
||||
def search_by_title(self, title: str, limit: Optional[int] = None, strict: bool = False) -> List[LehmannsSearchResult]:
|
||||
"""
|
||||
Parse the listing page only (no availability check here).
|
||||
Use enrich_pages(...) afterwards to fetch detail pages, add 'pages',
|
||||
and drop unbuyable items.
|
||||
"""
|
||||
url = self.build_search_url(title)
|
||||
html = self._get(url)
|
||||
if not html:
|
||||
return []
|
||||
results = self._parse_results(html)
|
||||
self.enrich_pages(results)
|
||||
if strict:
|
||||
# filter results to only those with exact title match (case-insensitive)
|
||||
title_lower = title.lower()
|
||||
results = [r for r in results if r.title and r.title.lower() == title_lower]
|
||||
results = [r for r in results if r.buyable]
|
||||
return results
|
||||
if limit is not None:
|
||||
results = results[:max(0, limit)]
|
||||
return results
|
||||
|
||||
# ------------------- Detail enrichment & filtering -------------------
|
||||
|
||||
def enrich_pages(self, results: Iterable[LehmannsSearchResult], drop_unbuyable: bool = True) -> List[LehmannsSearchResult]:
|
||||
"""
|
||||
Fetch each result.url, extract:
|
||||
- pages: from <span class="book-meta meta-seiten" itemprop="numberOfPages">...</span>
|
||||
- availability: from <li class="availability-3">...</li>
|
||||
* if it contains "Titel ist leider vergriffen", mark buyable=False
|
||||
* if it also contains "keine Neuauflage", set unavailable_hint accordingly
|
||||
If drop_unbuyable=True, exclude non-buyable results from the returned list.
|
||||
"""
|
||||
enriched: List[LehmannsSearchResult] = []
|
||||
for r in results:
|
||||
try:
|
||||
html = self._get(r.url)
|
||||
if not html:
|
||||
# Can't verify; keep as-is when not dropping, else skip
|
||||
if not drop_unbuyable:
|
||||
enriched.append(r)
|
||||
continue
|
||||
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
|
||||
# Pages
|
||||
pages_node = soup.select_one(
|
||||
"span.book-meta.meta-seiten[itemprop='numberOfPages'], "
|
||||
"span.book-meta.meta-seiten[itemprop='numberofpages'], "
|
||||
".meta-seiten [itemprop='numberOfPages'], "
|
||||
".meta-seiten[itemprop='numberOfPages'], "
|
||||
".book-meta.meta-seiten"
|
||||
)
|
||||
if pages_node:
|
||||
text = pages_node.get_text(" ", strip=True)
|
||||
m = re.search(r"\d+", text)
|
||||
if m:
|
||||
r.pages = f"{m.group(0)} Seiten"
|
||||
|
||||
# Availability via li.availability-3
|
||||
avail_li = soup.select_one("li.availability-3")
|
||||
if avail_li:
|
||||
avail_text = " ".join(avail_li.get_text(" ", strip=True).split()).lower()
|
||||
if "titel ist leider vergriffen" in avail_text:
|
||||
r.buyable = False
|
||||
if "keine neuauflage" in avail_text:
|
||||
r.unavailable_hint = "Titel ist leider vergriffen; keine Neuauflage"
|
||||
else:
|
||||
r.unavailable_hint = "Titel ist leider vergriffen"
|
||||
|
||||
# Append or drop
|
||||
if (not drop_unbuyable) or r.buyable:
|
||||
enriched.append(r)
|
||||
|
||||
except Exception:
|
||||
# On any per-item error, keep the record if not dropping; else skip
|
||||
if not drop_unbuyable:
|
||||
enriched.append(r)
|
||||
continue
|
||||
|
||||
return enriched
|
||||
|
||||
# ------------------- Internals -------------------
|
||||
|
||||
def _get(self, url: str) -> Optional[str]:
|
||||
try:
|
||||
r = self.client.get(url)
|
||||
r.encoding = "utf-8"
|
||||
if r.status_code == 200 and "text/html" in (r.headers.get("content-type") or ""):
|
||||
return r.text
|
||||
except httpx.HTTPError:
|
||||
pass
|
||||
return None
|
||||
|
||||
def _parse_results(self, html: str) -> List[LehmannsSearchResult]:
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
results: list[LehmannsSearchResult] = []
|
||||
|
||||
for block in soup.select("div.info-block"):
|
||||
a = block.select_one(".title a[href]")
|
||||
if not a:
|
||||
continue
|
||||
url = urljoin(BASE, a["href"].strip())
|
||||
base_title = (block.select_one(".title [itemprop='name']") or a).get_text(strip=True)
|
||||
|
||||
# Alternative headline => extend title
|
||||
alt_tag = block.select_one(".description[itemprop='alternativeHeadline']")
|
||||
alternative_headline = alt_tag.get_text(strip=True) if alt_tag else None
|
||||
title = f"{base_title} : {alternative_headline}" if alternative_headline else base_title
|
||||
description = alternative_headline
|
||||
|
||||
# Authors from .author
|
||||
authors: list[str] = []
|
||||
author_div = block.select_one("div.author")
|
||||
if author_div:
|
||||
t = author_div.get_text(" ", strip=True)
|
||||
t = re.sub(r"^\s*von\s+", "", t, flags=re.I)
|
||||
for part in re.split(r"\s*;\s*|\s*&\s*|\s+und\s+", t):
|
||||
name = " ".join(part.split())
|
||||
if name:
|
||||
authors.append(name)
|
||||
|
||||
# Media + format
|
||||
media_type = None
|
||||
book_format = None
|
||||
type_text = block.select_one(".type")
|
||||
if type_text:
|
||||
t = type_text.get_text(" ", strip=True)
|
||||
m = re.search(r"\b(Buch|eBook|Hörbuch)\b", t)
|
||||
if m:
|
||||
media_type = m.group(1)
|
||||
fm = re.search(r"\(([^)]+)\)", t)
|
||||
if fm:
|
||||
book_format = fm.group(1).strip().upper()
|
||||
|
||||
# Year
|
||||
year = None
|
||||
y = block.select_one("[itemprop='copyrightYear']")
|
||||
if y:
|
||||
try:
|
||||
year = int(y.get_text(strip=True))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Edition
|
||||
edition = None
|
||||
ed = block.select_one("[itemprop='bookEdition']")
|
||||
if ed:
|
||||
m = re.search(r"\d+", ed.get_text(strip=True))
|
||||
if m:
|
||||
edition = int(m.group())
|
||||
|
||||
# Publisher
|
||||
publisher = None
|
||||
pub = block.select_one(".publisherprop [itemprop='name']") or block.select_one(".publisher [itemprop='name']")
|
||||
if pub:
|
||||
publisher = pub.get_text(strip=True)
|
||||
|
||||
# ISBN-13
|
||||
isbn13 = None
|
||||
isbn_tag = block.select_one(".isbn [itemprop='isbn'], [itemprop='isbn']")
|
||||
if isbn_tag:
|
||||
digits = re.sub(r"[^0-9Xx]", "", isbn_tag.get_text(strip=True))
|
||||
m = re.search(r"(97[89]\d{10})", digits)
|
||||
if m:
|
||||
isbn13 = m.group(1)
|
||||
|
||||
# Price (best effort)
|
||||
price_eur = None
|
||||
txt = block.get_text(" ", strip=True)
|
||||
mprice = re.search(r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€", txt)
|
||||
if not mprice and block.parent:
|
||||
sib = block.parent.get_text(" ", strip=True)
|
||||
mprice = re.search(r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€", sib)
|
||||
if mprice:
|
||||
num = mprice.group(1).replace(".", "").replace(",", ".")
|
||||
try:
|
||||
price_eur = float(num)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Image (best-effort)
|
||||
image = None
|
||||
left_img = block.find_previous("img")
|
||||
if left_img and left_img.get("src"):
|
||||
image = urljoin(BASE, left_img["src"])
|
||||
|
||||
results.append(
|
||||
LehmannsSearchResult(
|
||||
title=title,
|
||||
url=url,
|
||||
description=description,
|
||||
authors=authors,
|
||||
media_type=media_type,
|
||||
book_format=book_format,
|
||||
year=year,
|
||||
edition=edition,
|
||||
publisher=publisher,
|
||||
isbn13=isbn13,
|
||||
price_eur=price_eur,
|
||||
image=image,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
448
src/logic/swb.py
Normal file
448
src/logic/swb.py
Normal file
@@ -0,0 +1,448 @@
|
||||
import xml.etree.ElementTree as ET
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
import requests
|
||||
|
||||
from src.logic.dataclass import BookData
|
||||
|
||||
# -----------------------
|
||||
# Dataclasses
|
||||
# -----------------------
|
||||
|
||||
|
||||
# --- MARC XML structures ---
|
||||
@dataclass
|
||||
class ControlField:
|
||||
tag: str
|
||||
value: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubField:
|
||||
code: str
|
||||
value: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class DataField:
|
||||
tag: str
|
||||
ind1: str = " "
|
||||
ind2: str = " "
|
||||
subfields: List[SubField] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MarcRecord:
|
||||
leader: str
|
||||
controlfields: List[ControlField] = field(default_factory=list)
|
||||
datafields: List[DataField] = field(default_factory=list)
|
||||
|
||||
|
||||
# --- SRU record wrapper ---
|
||||
@dataclass
|
||||
class Record:
|
||||
recordSchema: str
|
||||
recordPacking: str
|
||||
recordData: MarcRecord
|
||||
recordPosition: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class EchoedSearchRequest:
|
||||
version: str
|
||||
query: str
|
||||
maximumRecords: int
|
||||
recordPacking: str
|
||||
recordSchema: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchRetrieveResponse:
|
||||
version: str
|
||||
numberOfRecords: int
|
||||
records: List[Record] = field(default_factory=list)
|
||||
echoedSearchRetrieveRequest: Optional[EchoedSearchRequest] = None
|
||||
|
||||
|
||||
# -----------------------
|
||||
# Parser
|
||||
# -----------------------
|
||||
|
||||
ZS = "http://www.loc.gov/zing/srw/"
|
||||
MARC = "http://www.loc.gov/MARC21/slim"
|
||||
NS = {"zs": ZS, "marc": MARC}
|
||||
|
||||
|
||||
def _text(elem: Optional[ET.Element]) -> str:
|
||||
return (elem.text or "") if elem is not None else ""
|
||||
|
||||
|
||||
def _req_text(parent: ET.Element, path: str) -> str:
|
||||
el = parent.find(path, NS)
|
||||
if el is None or el.text is None:
|
||||
raise ValueError(f"Required element not found or empty: {path}")
|
||||
return el.text
|
||||
|
||||
|
||||
def parse_marc_record(record_el: ET.Element) -> MarcRecord:
|
||||
"""
|
||||
record_el is the <marc:record> element (default ns MARC in your sample)
|
||||
"""
|
||||
# leader
|
||||
leader_text = _req_text(record_el, "marc:leader")
|
||||
|
||||
# controlfields
|
||||
controlfields: List[ControlField] = []
|
||||
for cf in record_el.findall("marc:controlfield", NS):
|
||||
tag = cf.get("tag", "").strip()
|
||||
controlfields.append(ControlField(tag=tag, value=_text(cf)))
|
||||
|
||||
# datafields
|
||||
datafields: List[DataField] = []
|
||||
for df in record_el.findall("marc:datafield", NS):
|
||||
tag = df.get("tag", "").strip()
|
||||
ind1 = df.get("ind1") or " "
|
||||
ind2 = df.get("ind2") or " "
|
||||
subfields: List[SubField] = []
|
||||
for sf in df.findall("marc:subfield", NS):
|
||||
code = sf.get("code", "")
|
||||
subfields.append(SubField(code=code, value=_text(sf)))
|
||||
datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields))
|
||||
|
||||
return MarcRecord(
|
||||
leader=leader_text, controlfields=controlfields, datafields=datafields
|
||||
)
|
||||
|
||||
|
||||
def parse_record(zs_record_el: ET.Element) -> Record:
|
||||
recordSchema = _req_text(zs_record_el, "zs:recordSchema")
|
||||
recordPacking = _req_text(zs_record_el, "zs:recordPacking")
|
||||
|
||||
# recordData contains a MARC <record> with default MARC namespace in your sample
|
||||
recordData_el = zs_record_el.find("zs:recordData", NS)
|
||||
if recordData_el is None:
|
||||
raise ValueError("Missing zs:recordData")
|
||||
|
||||
marc_record_el = recordData_el.find("marc:record", NS)
|
||||
if marc_record_el is None:
|
||||
# If the MARC record uses default ns (xmlns="...") ElementTree still needs the ns-qualified name
|
||||
# We already searched with prefix; this covers both default and prefixed cases.
|
||||
raise ValueError("Missing MARC21 record inside zs:recordData")
|
||||
|
||||
marc_record = parse_marc_record(marc_record_el)
|
||||
|
||||
recordPosition = int(_req_text(zs_record_el, "zs:recordPosition"))
|
||||
return Record(
|
||||
recordSchema=recordSchema,
|
||||
recordPacking=recordPacking,
|
||||
recordData=marc_record,
|
||||
recordPosition=recordPosition,
|
||||
)
|
||||
|
||||
|
||||
def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]:
|
||||
el = root.find("zs:echoedSearchRetrieveRequest", NS)
|
||||
if el is None:
|
||||
return None
|
||||
|
||||
# Be permissive with missing fields
|
||||
version = _text(el.find("zs:version", NS))
|
||||
query = _text(el.find("zs:query", NS))
|
||||
maximumRecords_text = _text(el.find("zs:maximumRecords", NS)) or "0"
|
||||
recordPacking = _text(el.find("zs:recordPacking", NS))
|
||||
recordSchema = _text(el.find("zs:recordSchema", NS))
|
||||
|
||||
try:
|
||||
maximumRecords = int(maximumRecords_text)
|
||||
except ValueError:
|
||||
maximumRecords = 0
|
||||
|
||||
return EchoedSearchRequest(
|
||||
version=version,
|
||||
query=query,
|
||||
maximumRecords=maximumRecords,
|
||||
recordPacking=recordPacking,
|
||||
recordSchema=recordSchema,
|
||||
)
|
||||
|
||||
|
||||
def parse_search_retrieve_response(xml_str: str) -> SearchRetrieveResponse:
|
||||
root = ET.fromstring(xml_str)
|
||||
|
||||
# Root is zs:searchRetrieveResponse
|
||||
version = _req_text(root, "zs:version")
|
||||
numberOfRecords = int(_req_text(root, "zs:numberOfRecords"))
|
||||
|
||||
records_parent = root.find("zs:records", NS)
|
||||
records: List[Record] = []
|
||||
if records_parent is not None:
|
||||
for r in records_parent.findall("zs:record", NS):
|
||||
records.append(parse_record(r))
|
||||
|
||||
echoed = parse_echoed_request(root)
|
||||
|
||||
return SearchRetrieveResponse(
|
||||
version=version,
|
||||
numberOfRecords=numberOfRecords,
|
||||
records=records,
|
||||
echoedSearchRetrieveRequest=echoed,
|
||||
)
|
||||
|
||||
|
||||
# --- Query helpers over MarcRecord ---
|
||||
|
||||
|
||||
def iter_datafields(
|
||||
rec: MarcRecord,
|
||||
tag: Optional[str] = None,
|
||||
ind1: Optional[str] = None,
|
||||
ind2: Optional[str] = None,
|
||||
) -> Iterable[DataField]:
|
||||
"""Yield datafields, optionally filtered by tag/indicators."""
|
||||
for df in rec.datafields:
|
||||
if tag is not None and df.tag != tag:
|
||||
continue
|
||||
if ind1 is not None and df.ind1 != ind1:
|
||||
continue
|
||||
if ind2 is not None and df.ind2 != ind2:
|
||||
continue
|
||||
yield df
|
||||
|
||||
|
||||
def subfield_values(
|
||||
rec: MarcRecord,
|
||||
tag: str,
|
||||
code: str,
|
||||
*,
|
||||
ind1: Optional[str] = None,
|
||||
ind2: Optional[str] = None,
|
||||
) -> List[str]:
|
||||
"""All values for subfield `code` in every `tag` field (respecting indicators)."""
|
||||
out: List[str] = []
|
||||
for df in iter_datafields(rec, tag, ind1, ind2):
|
||||
out.extend(sf.value for sf in df.subfields if sf.code == code)
|
||||
return out
|
||||
|
||||
|
||||
def first_subfield_value(
|
||||
rec: MarcRecord,
|
||||
tag: str,
|
||||
code: str,
|
||||
*,
|
||||
ind1: Optional[str] = None,
|
||||
ind2: Optional[str] = None,
|
||||
default: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
"""First value for subfield `code` in `tag` (respecting indicators)."""
|
||||
for df in iter_datafields(rec, tag, ind1, ind2):
|
||||
for sf in df.subfields:
|
||||
if sf.code == code:
|
||||
return sf.value
|
||||
return default
|
||||
|
||||
|
||||
def find_datafields_with_subfields(
|
||||
rec: MarcRecord,
|
||||
tag: str,
|
||||
*,
|
||||
where_all: Optional[Dict[str, str]] = None,
|
||||
where_any: Optional[Dict[str, str]] = None,
|
||||
casefold: bool = False,
|
||||
ind1: Optional[str] = None,
|
||||
ind2: Optional[str] = None,
|
||||
) -> List[DataField]:
|
||||
"""
|
||||
Return datafields of `tag` whose subfields match constraints:
|
||||
- where_all: every (code -> exact value) must be present
|
||||
- where_any: at least one (code -> exact value) present
|
||||
Set `casefold=True` for case-insensitive comparison.
|
||||
"""
|
||||
where_all = where_all or {}
|
||||
where_any = where_any or {}
|
||||
matched: List[DataField] = []
|
||||
|
||||
for df in iter_datafields(rec, tag, ind1, ind2):
|
||||
# Map code -> list of values (with optional casefold applied)
|
||||
vals: Dict[str, List[str]] = {}
|
||||
for sf in df.subfields:
|
||||
v = sf.value.casefold() if casefold else sf.value
|
||||
vals.setdefault(sf.code, []).append(v)
|
||||
|
||||
ok = True
|
||||
for c, v in where_all.items():
|
||||
vv = v.casefold() if casefold else v
|
||||
if c not in vals or vv not in vals[c]:
|
||||
ok = False
|
||||
break
|
||||
|
||||
if ok and where_any:
|
||||
any_ok = any(
|
||||
(c in vals) and ((v.casefold() if casefold else v) in vals[c])
|
||||
for c, v in where_any.items()
|
||||
)
|
||||
if not any_ok:
|
||||
ok = False
|
||||
|
||||
if ok:
|
||||
matched.append(df)
|
||||
|
||||
return matched
|
||||
|
||||
|
||||
def controlfield_value(
|
||||
rec: MarcRecord, tag: str, default: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""Get the first controlfield value by tag (e.g., '001', '005')."""
|
||||
for cf in rec.controlfields:
|
||||
if cf.tag == tag:
|
||||
return cf.value
|
||||
return default
|
||||
|
||||
|
||||
def datafields_value(
|
||||
data: List[DataField], code: str, default: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""Get the first value for a specific subfield code in a list of datafields."""
|
||||
for df in data:
|
||||
for sf in df.subfields:
|
||||
if sf.code == code:
|
||||
return sf.value
|
||||
return default
|
||||
|
||||
|
||||
def datafield_value(
|
||||
df: DataField, code: str, default: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""Get the first value for a specific subfield code in a datafield."""
|
||||
for sf in df.subfields:
|
||||
if sf.code == code:
|
||||
return sf.value
|
||||
return default
|
||||
|
||||
|
||||
def _smart_join_title(a: str, b: Optional[str]) -> str:
|
||||
"""
|
||||
Join 245 $a and $b with MARC-style punctuation.
|
||||
If $b is present, join with ' : ' unless either side already supplies punctuation.
|
||||
"""
|
||||
a = a.strip()
|
||||
if not b:
|
||||
return a
|
||||
b = b.strip()
|
||||
if a.endswith((":", ";", "/")) or b.startswith((":", ";", "/")):
|
||||
return f"{a} {b}"
|
||||
return f"{a} : {b}"
|
||||
|
||||
|
||||
def subfield_values_from_fields(
|
||||
fields: Iterable[DataField],
|
||||
code: str,
|
||||
) -> List[str]:
|
||||
"""All subfield values with given `code` across a list of DataField."""
|
||||
return [sf.value for df in fields for sf in df.subfields if sf.code == code]
|
||||
|
||||
|
||||
def first_subfield_value_from_fields(
|
||||
fields: Iterable[DataField],
|
||||
code: str,
|
||||
default: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
"""First subfield value with given `code` across a list of DataField."""
|
||||
for df in fields:
|
||||
for sf in df.subfields:
|
||||
if sf.code == code:
|
||||
return sf.value
|
||||
return default
|
||||
|
||||
|
||||
def subfield_value_pairs_from_fields(
|
||||
fields: Iterable[DataField],
|
||||
code: str,
|
||||
) -> List[Tuple[DataField, str]]:
|
||||
"""
|
||||
Return (DataField, value) pairs for all subfields with `code`.
|
||||
Useful if you need to know which field a value came from.
|
||||
"""
|
||||
out: List[Tuple[DataField, str]] = []
|
||||
for df in fields:
|
||||
for sf in df.subfields:
|
||||
if sf.code == code:
|
||||
out.append((df, sf.value))
|
||||
return out
|
||||
|
||||
|
||||
def book_from_marc(rec: MarcRecord) -> BookData:
|
||||
# PPN from controlfield 001
|
||||
ppn = controlfield_value(rec, "001")
|
||||
|
||||
# Title = 245 $a + 245 $b (if present)
|
||||
t_a = first_subfield_value(rec, "245", "a")
|
||||
t_b = first_subfield_value(rec, "245", "b")
|
||||
title = _smart_join_title(t_a, t_b) if t_a else None
|
||||
|
||||
# Signature = 924 where $9 == "Frei 129" → take that field's $g
|
||||
frei_fields = find_datafields_with_subfields(
|
||||
rec, "924", where_all={"9": "Frei 129"}
|
||||
)
|
||||
signature = first_subfield_value_from_fields(frei_fields, "g")
|
||||
|
||||
# Year = 264 $c (prefer ind2="1" publication; fallback to any 264)
|
||||
year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value(
|
||||
rec, "264", "c"
|
||||
)
|
||||
isbn = subfield_values(rec, "020", "a")
|
||||
|
||||
return BookData(
|
||||
ppn=ppn,
|
||||
title=title,
|
||||
signature=signature,
|
||||
edition=first_subfield_value(rec, "250", "a"),
|
||||
year=year,
|
||||
pages=first_subfield_value(rec, "300", "a"),
|
||||
publisher=first_subfield_value(rec, "264", "b"),
|
||||
isbn=isbn,
|
||||
)
|
||||
|
||||
|
||||
class SWB:
|
||||
def __init__(self):
|
||||
self.url = "https://sru.k10plus.de/opac-de-627!rec=1?version=1.1&operation=searchRetrieve&query={}&maximumRecords=10&recordSchema=marcxml"
|
||||
self.bib_id = 20735
|
||||
|
||||
def get(self, query_args: Iterable[str]) -> List[Record]:
|
||||
# if any query_arg ends with =, remove it
|
||||
query_args = [arg for arg in query_args if not arg.endswith("=")]
|
||||
query = "+and+".join(query_args)
|
||||
query = query.replace(" ", "%20").replace("&", "%26")
|
||||
|
||||
url = self.url.format(query)
|
||||
|
||||
print("Fetching from SWB:", url)
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
|
||||
"Accept": "application/xml",
|
||||
"Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3",
|
||||
}
|
||||
response = requests.get(url, headers=headers)
|
||||
if response.status_code != 200:
|
||||
raise Exception(f"Error fetching data from SWB: {response.status_code}")
|
||||
# print(response.text)
|
||||
data = response.content
|
||||
|
||||
# extract top-level response
|
||||
response = parse_search_retrieve_response(data)
|
||||
return response.records
|
||||
|
||||
def getBooks(self, query_args: Iterable[str]) -> List[BookData]:
|
||||
records: List[Record] = self.get(query_args)
|
||||
books: List[BookData] = []
|
||||
title = query_args[1].split("=")[1]
|
||||
# print(len(records), "records found")
|
||||
for rec in records:
|
||||
book = book_from_marc(rec.recordData)
|
||||
books.append(book)
|
||||
books = [
|
||||
b for b in books if b.title and b.title.lower().startswith(title.lower())
|
||||
]
|
||||
return books
|
||||
Reference in New Issue
Block a user