677 lines
22 KiB
Python
677 lines
22 KiB
Python
import re
|
|
import time
|
|
import xml.etree.ElementTree as ET
|
|
from collections.abc import Iterable
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
|
|
# centralized logging used via src.shared.logging
|
|
from .schemas.bookdata import BookData
|
|
from .schemas.marcxml import (
|
|
ControlField,
|
|
DataField,
|
|
EchoedSearchRequest,
|
|
MarcRecord,
|
|
Record,
|
|
SearchRetrieveResponse,
|
|
SubField,
|
|
)
|
|
|
|
ZS = "http://www.loc.gov/zing/srw/"
|
|
MARC = "http://www.loc.gov/MARC21/slim"
|
|
NS = {"zs": ZS, "marc": MARC}
|
|
|
|
|
|
def _text(elem: ET.Element | None) -> str:
|
|
return (elem.text or "") if elem is not None else ""
|
|
|
|
|
|
def _req_text(parent: ET.Element, path: str) -> str:
|
|
el = parent.find(path, NS)
|
|
if el is None or el.text is None:
|
|
return None
|
|
return el.text
|
|
|
|
|
|
def parse_marc_record(record_el: ET.Element) -> MarcRecord:
|
|
"""record_el is the <marc:record> element (default ns MARC in your sample)"""
|
|
# leader
|
|
leader_text = _req_text(record_el, "marc:leader")
|
|
|
|
# controlfields
|
|
controlfields: list[ControlField] = []
|
|
for cf in record_el.findall("marc:controlfield", NS):
|
|
tag = cf.get("tag", "").strip()
|
|
controlfields.append(ControlField(tag=tag, value=_text(cf)))
|
|
|
|
# datafields
|
|
datafields: list[DataField] = []
|
|
for df in record_el.findall("marc:datafield", NS):
|
|
tag = df.get("tag", "").strip()
|
|
ind1 = df.get("ind1") or " "
|
|
ind2 = df.get("ind2") or " "
|
|
subfields: list[SubField] = []
|
|
for sf in df.findall("marc:subfield", NS):
|
|
code = sf.get("code", "")
|
|
subfields.append(SubField(code=code, value=_text(sf)))
|
|
datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields))
|
|
|
|
return MarcRecord(
|
|
leader=leader_text,
|
|
controlfields=controlfields,
|
|
datafields=datafields,
|
|
)
|
|
|
|
|
|
def parse_record(zs_record_el: ET.Element) -> Record:
|
|
recordSchema = _req_text(zs_record_el, "zs:recordSchema")
|
|
recordPacking = _req_text(zs_record_el, "zs:recordPacking")
|
|
|
|
# recordData contains a MARC <record> with default MARC namespace in your sample
|
|
recordData_el = zs_record_el.find("zs:recordData", NS)
|
|
if recordData_el is None:
|
|
raise ValueError("Missing zs:recordData")
|
|
|
|
marc_record_el = recordData_el.find("marc:record", NS)
|
|
if marc_record_el is None:
|
|
# If the MARC record uses default ns (xmlns="...") ElementTree still needs the ns-qualified name
|
|
# We already searched with prefix; this covers both default and prefixed cases.
|
|
return None
|
|
raise ValueError("Missing MARC21 record inside zs:recordData")
|
|
|
|
marc_record = parse_marc_record(marc_record_el)
|
|
|
|
recordPosition = int(_req_text(zs_record_el, "zs:recordPosition"))
|
|
return Record(
|
|
recordSchema=recordSchema,
|
|
recordPacking=recordPacking,
|
|
recordData=marc_record,
|
|
recordPosition=recordPosition,
|
|
)
|
|
|
|
|
|
def parse_echoed_request(root: ET.Element) -> EchoedSearchRequest | None:
|
|
el = root.find("zs:echoedSearchRetrieveRequest", NS)
|
|
if el is None:
|
|
return None
|
|
|
|
# Be permissive with missing fields
|
|
version = _text(el.find("zs:version", NS))
|
|
query = _text(el.find("zs:query", NS))
|
|
maximumRecords_text = _text(el.find("zs:maximumRecords", NS)) or "0"
|
|
recordPacking = _text(el.find("zs:recordPacking", NS))
|
|
recordSchema = _text(el.find("zs:recordSchema", NS))
|
|
|
|
try:
|
|
maximumRecords = int(maximumRecords_text)
|
|
except ValueError:
|
|
maximumRecords = 0
|
|
|
|
return EchoedSearchRequest(
|
|
version=version,
|
|
query=query,
|
|
maximumRecords=maximumRecords,
|
|
recordPacking=recordPacking,
|
|
recordSchema=recordSchema,
|
|
)
|
|
|
|
|
|
def parse_search_retrieve_response(
|
|
xml_str: str | bytes,
|
|
) -> SearchRetrieveResponse:
|
|
root = ET.fromstring(xml_str)
|
|
|
|
# Root is zs:searchRetrieveResponse
|
|
version = _req_text(root, "zs:version")
|
|
numberOfRecords = int(_req_text(root, "zs:numberOfRecords") or "0")
|
|
|
|
records_parent = root.find("zs:records", NS)
|
|
records: list[Record] = []
|
|
if records_parent is not None:
|
|
for r in records_parent.findall("zs:record", NS):
|
|
record = parse_record(r)
|
|
if record is not None:
|
|
records.append(record)
|
|
|
|
echoed = parse_echoed_request(root)
|
|
|
|
return SearchRetrieveResponse(
|
|
version=version,
|
|
numberOfRecords=numberOfRecords,
|
|
records=records,
|
|
echoedSearchRetrieveRequest=echoed,
|
|
)
|
|
|
|
|
|
# --- Query helpers over MarcRecord ---
|
|
|
|
|
|
def iter_datafields(
|
|
rec: MarcRecord,
|
|
tag: str | None = None,
|
|
ind1: str | None = None,
|
|
ind2: str | None = None,
|
|
) -> Iterable[DataField]:
|
|
"""Yield datafields, optionally filtered by tag/indicators."""
|
|
for df in rec.datafields:
|
|
if tag is not None and df.tag != tag:
|
|
continue
|
|
if ind1 is not None and df.ind1 != ind1:
|
|
continue
|
|
if ind2 is not None and df.ind2 != ind2:
|
|
continue
|
|
yield df
|
|
|
|
|
|
def subfield_values(
|
|
rec: MarcRecord,
|
|
tag: str,
|
|
code: str,
|
|
*,
|
|
ind1: str | None = None,
|
|
ind2: str | None = None,
|
|
) -> list[str]:
|
|
"""All values for subfield `code` in every `tag` field (respecting indicators)."""
|
|
out: list[str] = []
|
|
for df in iter_datafields(rec, tag, ind1, ind2):
|
|
out.extend(sf.value for sf in df.subfields if sf.code == code)
|
|
return out
|
|
|
|
|
|
def first_subfield_value(
|
|
rec: MarcRecord,
|
|
tag: str,
|
|
code: str,
|
|
*,
|
|
ind1: str | None = None,
|
|
ind2: str | None = None,
|
|
default: str | None = None,
|
|
) -> str | None:
|
|
"""First value for subfield `code` in `tag` (respecting indicators)."""
|
|
for df in iter_datafields(rec, tag, ind1, ind2):
|
|
for sf in df.subfields:
|
|
if sf.code == code:
|
|
return sf.value
|
|
return default
|
|
|
|
|
|
def find_datafields_with_subfields(
|
|
rec: MarcRecord,
|
|
tag: str,
|
|
*,
|
|
where_all: dict[str, str] | None = None,
|
|
where_any: dict[str, str] | None = None,
|
|
casefold: bool = False,
|
|
ind1: str | None = None,
|
|
ind2: str | None = None,
|
|
) -> list[DataField]:
|
|
"""Return datafields of `tag` whose subfields match constraints:
|
|
- where_all: every (code -> exact value) must be present
|
|
- where_any: at least one (code -> exact value) present
|
|
Set `casefold=True` for case-insensitive comparison.
|
|
"""
|
|
where_all = where_all or {}
|
|
where_any = where_any or {}
|
|
matched: list[DataField] = []
|
|
|
|
for df in iter_datafields(rec, tag, ind1, ind2):
|
|
# Map code -> list of values (with optional casefold applied)
|
|
vals: dict[str, list[str]] = {}
|
|
for sf in df.subfields:
|
|
v = sf.value.casefold() if casefold else sf.value
|
|
vals.setdefault(sf.code, []).append(v)
|
|
|
|
ok = True
|
|
for c, v in where_all.items():
|
|
vv = v.casefold() if casefold else v
|
|
if c not in vals or vv not in vals[c]:
|
|
ok = False
|
|
break
|
|
|
|
if ok and where_any:
|
|
any_ok = any(
|
|
(c in vals) and ((v.casefold() if casefold else v) in vals[c])
|
|
for c, v in where_any.items()
|
|
)
|
|
if not any_ok:
|
|
ok = False
|
|
|
|
if ok:
|
|
matched.append(df)
|
|
|
|
return matched
|
|
|
|
|
|
def controlfield_value(
|
|
rec: MarcRecord,
|
|
tag: str,
|
|
default: str | None = None,
|
|
) -> str | None:
|
|
"""Get the first controlfield value by tag (e.g., '001', '005')."""
|
|
for cf in rec.controlfields:
|
|
if cf.tag == tag:
|
|
return cf.value
|
|
return default
|
|
|
|
|
|
def datafields_value(
|
|
data: list[DataField],
|
|
code: str,
|
|
default: str | None = None,
|
|
) -> str | None:
|
|
"""Get the first value for a specific subfield code in a list of datafields."""
|
|
for df in data:
|
|
for sf in df.subfields:
|
|
if sf.code == code:
|
|
return sf.value
|
|
return default
|
|
|
|
|
|
def datafield_value(
|
|
df: DataField,
|
|
code: str,
|
|
default: str | None = None,
|
|
) -> str | None:
|
|
"""Get the first value for a specific subfield code in a datafield."""
|
|
for sf in df.subfields:
|
|
if sf.code == code:
|
|
return sf.value
|
|
return default
|
|
|
|
|
|
def _smart_join_title(a: str, b: str | None) -> str:
|
|
"""Join 245 $a and $b with MARC-style punctuation.
|
|
If $b is present, join with ' : ' unless either side already supplies punctuation.
|
|
"""
|
|
a = a.strip()
|
|
if not b:
|
|
return a
|
|
b = b.strip()
|
|
if a.endswith((":", ";", "/")) or b.startswith((":", ";", "/")):
|
|
return f"{a} {b}"
|
|
return f"{a} : {b}"
|
|
|
|
|
|
def subfield_values_from_fields(
|
|
fields: Iterable[DataField],
|
|
code: str,
|
|
) -> list[str]:
|
|
"""All subfield values with given `code` across a list of DataField."""
|
|
return [sf.value for df in fields for sf in df.subfields if sf.code == code]
|
|
|
|
|
|
def first_subfield_value_from_fields(
|
|
fields: Iterable[DataField],
|
|
code: str,
|
|
default: str | None = None,
|
|
) -> str | None:
|
|
"""First subfield value with given `code` across a list of DataField."""
|
|
for df in fields:
|
|
for sf in df.subfields:
|
|
if sf.code == code:
|
|
return sf.value
|
|
return default
|
|
|
|
|
|
def subfield_value_pairs_from_fields(
|
|
fields: Iterable[DataField],
|
|
code: str,
|
|
) -> list[tuple[DataField, str]]:
|
|
"""Return (DataField, value) pairs for all subfields with `code`.
|
|
Useful if you need to know which field a value came from.
|
|
"""
|
|
out: list[tuple[DataField, str]] = []
|
|
for df in fields:
|
|
for sf in df.subfields:
|
|
if sf.code == code:
|
|
out.append((df, sf.value))
|
|
return out
|
|
|
|
|
|
def book_from_marc(rec: MarcRecord, library_identifier: str) -> BookData:
|
|
library_ident_tag = library_identifier.split("$")[0]
|
|
library_ident_code = library_identifier.split("$")[1]
|
|
# PPN from controlfield 001
|
|
ppn = controlfield_value(rec, "001")
|
|
|
|
# Title = 245 $a + 245 $b (if present)
|
|
t_a = first_subfield_value(rec, "245", "a")
|
|
t_b = "" # first_subfield_value(rec, "245", "b")
|
|
title = _smart_join_title(t_a, t_b) if t_a else None
|
|
|
|
# Signature = 924 where $9 == "Frei 129" → take that field's $g
|
|
frei_fields = find_datafields_with_subfields(
|
|
rec,
|
|
"924",
|
|
where_all={"9": "Frei 129"},
|
|
)
|
|
signature = first_subfield_value_from_fields(frei_fields, "g")
|
|
|
|
# Year = 264 $c (prefer ind2="1" publication; fallback to any 264)
|
|
year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value(
|
|
rec,
|
|
"264",
|
|
"c",
|
|
)
|
|
isbn = subfield_values(rec, "020", "a")
|
|
mediatype = first_subfield_value(rec, "338", "a")
|
|
lang = subfield_values(rec, "041", "a")
|
|
authors = subfield_values(rec, "700", "a")
|
|
author = None
|
|
if authors:
|
|
author = "; ".join(authors)
|
|
libraries = subfield_values(rec, library_ident_tag, library_ident_code)
|
|
|
|
return BookData(
|
|
ppn=ppn,
|
|
title=title,
|
|
signature=signature,
|
|
edition=first_subfield_value(rec, "250", "a") or "",
|
|
year=year,
|
|
pages=first_subfield_value(rec, "300", "a") or "",
|
|
publisher=first_subfield_value(rec, "264", "b") or "",
|
|
isbn=isbn,
|
|
language=lang,
|
|
link="",
|
|
author=author,
|
|
media_type=mediatype,
|
|
libraries=libraries,
|
|
)
|
|
|
|
|
|
RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK notations
|
|
|
|
|
|
def find_newer_edition(
|
|
swb_result: BookData,
|
|
dnb_result: list[BookData],
|
|
) -> list[BookData] | None:
|
|
"""New edition if:
|
|
- year > swb.year OR
|
|
- edition_number > swb.edition_number
|
|
|
|
Additional guards & preferences:
|
|
- If both have signatures and they differ, skip (not the same work).
|
|
- For duplicates (same ppn): keep the one that has a signature, and
|
|
prefer a signature that matches swb_result.signature.
|
|
- If multiple remain: keep the single 'latest' by (year desc,
|
|
edition_number desc, best-signature-match desc, has-signature desc).
|
|
"""
|
|
|
|
def norm_sig(s: str | None) -> str:
|
|
if not s:
|
|
return ""
|
|
# normalize: lowercase, collapse whitespace, keep alnum + a few separators
|
|
s = s.lower()
|
|
s = re.sub(r"\s+", " ", s).strip()
|
|
# remove obvious noise; adjust if your signature format differs
|
|
s = re.sub(r"[^a-z0-9\-_/\. ]+", "", s)
|
|
return s
|
|
|
|
def has_sig(b: BookData) -> bool:
|
|
return bool(getattr(b, "signature", None))
|
|
|
|
def sig_matches_swb(b: BookData) -> bool:
|
|
if not has_sig(b) or not has_sig(swb_result):
|
|
return False
|
|
return norm_sig(b.signature) == norm_sig(swb_result.signature)
|
|
|
|
def strictly_newer(b: BookData) -> bool:
|
|
by_year = (
|
|
b.year is not None
|
|
and swb_result.year is not None
|
|
and b.year > swb_result.year
|
|
)
|
|
by_edition = (
|
|
b.edition_number is not None
|
|
and swb_result.edition_number is not None
|
|
and b.edition_number > swb_result.edition_number
|
|
)
|
|
return by_year or by_edition
|
|
|
|
swb_sig_norm = norm_sig(getattr(swb_result, "signature", None))
|
|
|
|
# 1) Filter to same-work AND newer
|
|
candidates: list[BookData] = []
|
|
for b in dnb_result:
|
|
# Skip if both signatures exist and don't match (different work)
|
|
b_sig = getattr(b, "signature", None)
|
|
if b_sig and swb_result.signature:
|
|
if norm_sig(b_sig) != swb_sig_norm:
|
|
continue # not the same work
|
|
|
|
# Keep only if newer by rules
|
|
if strictly_newer(b):
|
|
candidates.append(b)
|
|
|
|
if not candidates:
|
|
return None
|
|
|
|
# 2) Dedupe by PPN, preferring signature (and matching signature if possible)
|
|
by_ppn: dict[str | None, BookData] = {}
|
|
for b in candidates:
|
|
key = getattr(b, "ppn", None)
|
|
prev = by_ppn.get(key)
|
|
if prev is None:
|
|
by_ppn[key] = b
|
|
continue
|
|
|
|
# Compute preference score for both
|
|
def ppn_pref_score(x: BookData) -> tuple[int, int]:
|
|
# (signature matches swb, has signature)
|
|
return (1 if sig_matches_swb(x) else 0, 1 if has_sig(x) else 0)
|
|
|
|
if ppn_pref_score(b) > ppn_pref_score(prev):
|
|
by_ppn[key] = b
|
|
|
|
deduped = list(by_ppn.values())
|
|
if not deduped:
|
|
return None
|
|
|
|
# 3) If multiple remain, keep only the latest one.
|
|
# Order: year desc, edition_number desc, signature-match desc, has-signature desc
|
|
def sort_key(b: BookData):
|
|
year = b.year if b.year is not None else -1
|
|
ed = b.edition_number if b.edition_number is not None else -1
|
|
sig_match = 1 if sig_matches_swb(b) else 0
|
|
sig_present = 1 if has_sig(b) else 0
|
|
return (year, ed, sig_match, sig_present)
|
|
|
|
best = max(deduped, key=sort_key)
|
|
return [best] if best else None
|
|
|
|
|
|
class QueryTransformer:
|
|
def __init__(self, api_schema: type[Enum], arguments: Iterable[str] | str):
|
|
self.api_schema = api_schema
|
|
if isinstance(arguments, str):
|
|
self.arguments = [arguments]
|
|
else:
|
|
self.arguments = arguments
|
|
self.drop_empty = True
|
|
|
|
def transform(self) -> dict[str, Any]:
|
|
arguments: list[str] = []
|
|
schema = self.api_schema
|
|
for arg in self.arguments:
|
|
if "=" not in arg:
|
|
continue
|
|
if self.drop_empty and arg.endswith("="):
|
|
continue
|
|
key, value = arg.split("=", 1)
|
|
if hasattr(schema, key.upper()):
|
|
api_key = getattr(schema, key.upper()).value
|
|
if key.upper() == "AUTHOR" and hasattr(schema, "AUTHOR_SCHEMA"):
|
|
author_schema = schema.AUTHOR_SCHEMA.value
|
|
if author_schema == "SpaceAfterComma":
|
|
value = value.replace(",", ", ")
|
|
elif author_schema == "NoSpaceAfterComma":
|
|
value = value.replace(", ", ",")
|
|
value = value.replace(" ", " ")
|
|
if key.upper() == "TITLE" and hasattr(
|
|
schema,
|
|
"ENCLOSE_TITLE_IN_QUOTES",
|
|
):
|
|
if schema.ENCLOSE_TITLE_IN_QUOTES:
|
|
value = f'"{value}"'
|
|
|
|
arguments.append(f"{api_key}={value}")
|
|
|
|
return arguments
|
|
|
|
|
|
class Api:
|
|
def __init__(
|
|
self,
|
|
site: str,
|
|
url: str,
|
|
prefix: type[Enum],
|
|
library_identifier: str,
|
|
notsupported_args: list[str] | None = None,
|
|
replace: dict[str, str] | None = None,
|
|
):
|
|
self.site = site
|
|
self.url = url
|
|
self.prefix = prefix
|
|
self.replace = replace or {}
|
|
self.library_identifier = library_identifier
|
|
self.notsupported_args = notsupported_args or []
|
|
# Reuse TCP connections across requests for better performance
|
|
self._session = requests.Session()
|
|
# Slightly larger connection pool for concurrent calls
|
|
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20)
|
|
self._session.mount("http://", adapter)
|
|
self._session.mount("https://", adapter)
|
|
# Rate limiting and retry config
|
|
self._last_request_time: float = 0.0
|
|
self._rate_limit_seconds: float = 1.0
|
|
self._max_retries: int = 5
|
|
self._overall_timeout_seconds: float = (
|
|
30.0 # max total time per logical request
|
|
)
|
|
|
|
def close(self):
|
|
try:
|
|
self._session.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def __del__(self):
|
|
# Best-effort cleanup
|
|
self.close()
|
|
|
|
def get(self, query_args: Iterable[str] | str) -> list[Record]:
|
|
start_time = time.monotonic()
|
|
# if any query_arg ends with =, remove it
|
|
if isinstance(query_args, str):
|
|
query_args = [query_args]
|
|
if self.notsupported_args:
|
|
query_args = [
|
|
qa
|
|
for qa in query_args
|
|
if not any(qa.startswith(na + "=") for na in self.notsupported_args)
|
|
]
|
|
query_args = QueryTransformer(
|
|
api_schema=self.prefix,
|
|
arguments=query_args,
|
|
).transform()
|
|
query = "+and+".join(query_args)
|
|
for old, new in self.replace.items():
|
|
query = query.replace(old, new)
|
|
|
|
url = self.url.format(query)
|
|
headers = {
|
|
"User-Agent": f"{self.site} SRU Client, <alexander.kirchner@ph-freiburg.de>",
|
|
"Accept": "application/xml",
|
|
"Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3",
|
|
}
|
|
# Use persistent session, enforce 1 req/sec, and retry up to 5 times
|
|
last_error: Exception | None = None
|
|
for attempt in range(1, self._max_retries + 1):
|
|
# Abort if overall timeout exceeded before starting attempt
|
|
if time.monotonic() - start_time > self._overall_timeout_seconds:
|
|
last_error = requests.exceptions.Timeout(
|
|
f"Overall timeout {self._overall_timeout_seconds}s exceeded before attempt {attempt}",
|
|
)
|
|
break
|
|
# Enforce rate limit relative to last request end
|
|
now = time.monotonic()
|
|
elapsed = now - self._last_request_time
|
|
if elapsed < self._rate_limit_seconds:
|
|
time.sleep(self._rate_limit_seconds - elapsed)
|
|
|
|
try:
|
|
# Per-attempt read timeout capped at remaining overall budget (but at most 30s)
|
|
remaining = max(
|
|
0.0,
|
|
self._overall_timeout_seconds - (time.monotonic() - start_time),
|
|
)
|
|
read_timeout = min(30.0, remaining if remaining > 0 else 0.001)
|
|
resp = self._session.get(
|
|
url,
|
|
headers=headers,
|
|
timeout=(3.05, read_timeout),
|
|
)
|
|
self._last_request_time = time.monotonic()
|
|
if resp.status_code == 200:
|
|
# Parse using raw bytes (original behavior) to preserve encoding edge cases
|
|
sr = parse_search_retrieve_response(resp.content)
|
|
return sr.records
|
|
last_error = Exception(
|
|
f"Error fetching data from {self.site}: HTTP {resp.status_code} (attempt {attempt}/{self._max_retries})",
|
|
)
|
|
except requests.exceptions.ReadTimeout as e:
|
|
last_error = e
|
|
except requests.exceptions.Timeout as e:
|
|
last_error = e
|
|
except Exception as e:
|
|
last_error = e
|
|
|
|
# Wait before the next attempt to respect rate limit between retries as well
|
|
if attempt < self._max_retries:
|
|
time.sleep(self._rate_limit_seconds)
|
|
|
|
# If we exit the loop, all attempts failed
|
|
raise last_error if last_error else Exception("Unknown request failure")
|
|
|
|
def getBooks(self, query_args: Iterable[str] | str) -> list[BookData]:
|
|
try:
|
|
records: list[Record] = self.get(query_args)
|
|
except requests.exceptions.ReadTimeout:
|
|
# Return a list with a single empty BookData object on read timeout
|
|
return [BookData()]
|
|
except requests.exceptions.Timeout:
|
|
# Overall timeout exceeded
|
|
return [BookData()]
|
|
except Exception:
|
|
# Propagate other errors (could also choose to return empty list)
|
|
raise
|
|
# Avoid printing on hot paths; rely on logger if needed
|
|
books: list[BookData] = []
|
|
# extract title from query_args if present
|
|
title = None
|
|
for arg in query_args:
|
|
if arg.startswith("pica.tit="):
|
|
title = arg.split("=")[1]
|
|
break
|
|
for rec in records:
|
|
book = book_from_marc(rec.recordData, self.library_identifier)
|
|
books.append(book)
|
|
if title:
|
|
books = [
|
|
b
|
|
for b in books
|
|
if b.title and b.title.lower().startswith(title.lower())
|
|
]
|
|
return books
|
|
|
|
def getLinkForBook(self, book: BookData) -> str:
|
|
# Not implemented: depends on catalog front-end; return empty string for now
|
|
return ""
|
|
|
|
# def search(self, query_args: Union[Iterable[str], str]):
|
|
# arguments =
|