chore(all): run formatting on repo, start work on porting webrequest over to api library

This commit is contained in:
2025-11-27 14:29:33 +01:00
parent 04010815a9
commit 539e1331a0
10 changed files with 925 additions and 233 deletions

View File

@@ -1,18 +1,25 @@
from .schemas.api_types import *
from .schemas.api_types import (
DNBSchema,
HBZSchema,
HebisSchema,
KOBVSchema,
OEVKSchema,
SWBSchema,
)
from .sru import Api as _Api
__all__ = [
"SWB",
"DNB",
"KOBV",
"HEBIS",
"OEVK",
"HBZ",
"HEBIS",
"KOBV",
"OEVK",
"SWB",
]
class SWB(_Api):
def __init__(self):
def __init__(self) -> None:
self.site = SWBSchema.NAME.value
self.url = SWBSchema.URL.value
self.prefix = SWBSchema.ARGSCHEMA.value
@@ -21,7 +28,7 @@ class SWB(_Api):
class DNB(_Api):
def __init__(self):
def __init__(self) -> None:
self.site = DNBSchema.NAME.value
self.url = DNBSchema.URL.value
self.prefix = DNBSchema.ARGSCHEMA.value
@@ -29,7 +36,7 @@ class DNB(_Api):
class KOBV(_Api):
def __init__(self):
def __init__(self) -> None:
self.site = KOBVSchema.NAME.value
self.url = KOBVSchema.URL.value
self.prefix = KOBVSchema.ARGSCHEMA.value
@@ -38,7 +45,7 @@ class KOBV(_Api):
class HEBIS(_Api):
def __init__(self):
def __init__(self) -> None:
self.site = HebisSchema.NAME.value
self.url = HebisSchema.URL.value
self.prefix = HebisSchema.ARGSCHEMA.value
@@ -56,7 +63,7 @@ class HEBIS(_Api):
class OEVK(_Api):
def __init__(self):
def __init__(self) -> None:
self.site = OEVKSchema.NAME.value
self.url = OEVKSchema.URL.value
self.prefix = OEVKSchema.ARGSCHEMA.value
@@ -65,20 +72,22 @@ class OEVK(_Api):
class HBZ(_Api):
"""
Small wrapper of the SRU API used to retrieve data from the HBZ libraries
"""Small wrapper of the SRU API used to retrieve data from the HBZ libraries.
All fields are available [here](https://eu04.alma.exlibrisgroup.com/view/sru/49HBZ_NETWORK?version=1.2)
Schema
------
HBZSchema: <HBZSchema>
HBZSchema: "HBZSchema"
query prefix: alma.
"""
def __init__(self):
def __init__(self) -> None:
self.site = HBZSchema.NAME.value
self.url = HBZSchema.URL.value
self.prefix = HBZSchema.ARGSCHEMA.value
self.library_identifier = HBZSchema.LIBRARY_NAME_LOCATION_FIELD.value
super().__init__(self.site, self.url, self.prefix, self.library_identifier)

502
src/bibapi/_transformers.py Normal file
View File

@@ -0,0 +1,502 @@
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from typing import Any
from src.bibapi.schemas.bookdata import BookData
@dataclass
class Item:
superlocation: str | None = dataclass_field(default_factory=str)
status: str | None = dataclass_field(default_factory=str)
availability: str | None = dataclass_field(default_factory=str)
notes: str | None = dataclass_field(default_factory=str)
limitation: str | None = dataclass_field(default_factory=str)
duedate: str | None = dataclass_field(default_factory=str)
id: str | None = dataclass_field(default_factory=str)
item_id: str | None = dataclass_field(default_factory=str)
ilslink: str | None = dataclass_field(default_factory=str)
number: int | None = dataclass_field(default_factory=int)
barcode: str | None = dataclass_field(default_factory=str)
reserve: str | None = dataclass_field(default_factory=str)
callnumber: str | None = dataclass_field(default_factory=str)
department: str | None = dataclass_field(default_factory=str)
locationhref: str | None = dataclass_field(default_factory=str)
location: str | None = dataclass_field(default_factory=str)
ktrl_nr: str | None = dataclass_field(default_factory=str)
def from_dict(self, data: dict[str, Any]) -> Item:
"""Import data from dict."""
data = data["items"]
for entry in data:
for key, value in entry.items():
setattr(self, key, value)
return self
@dataclass
class RDS_AVAIL_DATA:
"""Class to store RDS availability data"""
library_sigil: str = dataclass_field(default_factory=str)
items: List[Item] = dataclass_field(default_factory=list)
def import_from_dict(self, data: str):
"""Import data from dict"""
edata = json.loads(data)
# library sigil is first key
self.library_sigil = str(list(edata.keys())[0])
# get data from first key
edata = edata[self.library_sigil]
for location in edata:
item = Item(superlocation=location).from_dict(edata[location])
self.items.append(item)
return self
@dataclass
class RDS_DATA:
"""Class to store RDS data"""
RDS_SIGNATURE: str = dataclass_field(default_factory=str)
RDS_STATUS: str = dataclass_field(default_factory=str)
RDS_LOCATION: str = dataclass_field(default_factory=str)
RDS_URL: Any = dataclass_field(default_factory=str)
RDS_HINT: Any = dataclass_field(default_factory=str)
RDS_COMMENT: Any = dataclass_field(default_factory=str)
RDS_HOLDING: Any = dataclass_field(default_factory=str)
RDS_HOLDING_LEAK: Any = dataclass_field(default_factory=str)
RDS_INTERN: Any = dataclass_field(default_factory=str)
RDS_PROVENIENCE: Any = dataclass_field(default_factory=str)
RDS_LOCAL_NOTATION: str = dataclass_field(default_factory=str)
RDS_LEA: Any = dataclass_field(default_factory=str)
def import_from_dict(self, data: dict) -> RDS_DATA:
"""Import data from dict"""
for key, value in data.items():
setattr(self, key, value)
return self
@dataclass
class RDS_GENERIC_DATA:
LibrarySigil: str = dataclass_field(default_factory=str)
RDS_DATA: List[RDS_DATA] = dataclass_field(default_factory=list)
def import_from_dict(self, data: str) -> RDS_GENERIC_DATA:
"""Import data from dict"""
edata = json.loads(data)
# library sigil is first key
self.LibrarySigil = str(list(edata.keys())[0])
# get data from first key
edata = edata[self.LibrarySigil]
for entry in edata:
rds_data = RDS_DATA() # Create a new RDS_DATA instance
# Populate the RDS_DATA instance from the entry
# This assumes that the entry is a dictionary that matches the structure of the RDS_DATA class
rds_data.import_from_dict(entry)
self.RDS_DATA.append(rds_data) # Add the RDS_DATA instance to the list
return self
class BaseStruct:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
class ARRAYData:
def __init__(self, signature=None) -> None:
self.signature = None
def transform(self, data: str) -> BookData:
def _get_line(source: str, search: str) -> str:
try:
data = (
source.split(search)[1]
.split("\n")[0]
.strip()
.replace("=>", "")
.strip()
)
return data
except Exception:
# # log.debug(f"ARRAYData.transform failed, {source}, {search}")
log.exception(f"ARRAYData.transform failed, no string {search}")
return ""
def _get_list_entry(source: str, search: str, entry: str) -> str:
try:
source = source.replace("\t", "").replace("\r", "")
source = source.split(search)[1].split(")")[0]
return _get_line(source, entry).replace("=>", "").strip()
except:
return ""
def _get_isbn(source: str) -> list:
try:
isbn = source.split("[isbn]")[1].split(")")[0].strip()
isbn = isbn.split("(")[1]
isbns = isbn.split("=>")
ret = []
for _ in isbns:
# remove _ from list
isb = _.split("\n")[0].strip()
if isb == "":
continue
ret.append(isb) if isb not in ret else None
return ret
except:
isbn = []
return isbn
def _get_signature(data):
try:
sig_data = (
data.split("[loksatz]")[1]
.split("[0] => ")[1]
.split("\n")[0]
.strip()
)
signature_data = eval(sig_data)
return signature_data["signatur"]
except Exception:
return None
def _get_author(data):
try:
array = data.split("[au_display_short]")[1].split(")\n")[0].strip()
except Exception:
return ""
entries = array.split("\n")
authors = []
hg_present = False
verf_present = False
lines = []
for entry in entries:
if "=>" in entry:
line = entry.split("=>")[1].strip()
if "[HerausgeberIn]" in line:
hg_present = True
if "[VerfasserIn]" in line:
verf_present = True
lines.append(line)
for line in lines:
if hg_present and verf_present:
if "[HerausgeberIn]" in line:
authors.append(line.split("[")[0].strip())
elif verf_present:
if "[VerfasserIn]" in line:
authors.append(line.split("[")[0].strip())
else:
pass
return ";".join(authors)
def _get_title(data):
titledata = None
title = ""
if "[ti_long]" in data:
titledata = data.split("[ti_long]")[1].split(")\n")[0].strip()
title = titledata.split("=>")[1].strip().split("/")[0].strip()
if "[ti_long_f]" in data:
titledata = data.split("[ti_long_f]")[1].split(")\n")[0].strip()
title = titledata.split("=>")[1].strip().split("/")[0].strip()
return title
def _get_adis_idn(data, signature):
loksatz_match = re.search(
r"\[loksatz\] => Array\s*\((.*?)\)", data, re.DOTALL,
)
if loksatz_match:
loksatz_content = loksatz_match.group(1)
# Step 2: Extract JSON objects within the loksatz section
json_objects = re.findall(r"{.*?}", loksatz_content, re.DOTALL)
# Print each JSON object
for obj in json_objects:
data = eval(obj)
if data["signatur"] == signature:
return data["adis_idn"]
def _get_in_apparat(data):
loksatz_match = re.search(
r"\[loksatz\] => Array\s*\((.*?)\)", data, re.DOTALL,
)
if loksatz_match:
loksatz_content = loksatz_match.group(1)
# Step 2: Extract JSON objects within the loksatz section
json_objects = re.findall(r"{.*?}", loksatz_content, re.DOTALL)
# Print each JSON object
for obj in json_objects:
data = eval(obj)
if data["ausleihcode"] == "R" and data["standort"] == "40":
return True
return False
ppn = _get_line(data, "[kid]")
title = _get_title(data).strip()
author = _get_author(data)
edition = _get_list_entry(data, "[ausgabe]", "[0]").replace(",", "")
link = f"https://rds.ibs-bw.de/phfreiburg/link?kid={_get_line(data, '[kid]')}"
isbn = _get_isbn(data)
# [self._get_list_entry(data,"[isbn]","[0]"),self._get_list_entry(data,"[is]","[1]")],
language = _get_list_entry(data, "[la_facet]", "[0]")
publisher = _get_list_entry(data, "[pu]", "[0]")
year = _get_list_entry(data, "[py_display]", "[0]")
pages = _get_list_entry(data, "[umfang]", "[0]").split(":")[0].strip()
signature = (
self.signature if self.signature is not None else _get_signature(data)
)
place = _get_list_entry(data, "[pp]", "[0]")
adis_idn = _get_adis_idn(data, signature=signature)
in_apparat = _get_in_apparat(data)
return BookData(
ppn=ppn,
title=title,
author=author,
edition=edition,
link=link,
isbn=isbn,
language=language,
publisher=publisher,
year=year,
pages=pages,
signature=signature,
place=place,
adis_idn=adis_idn,
in_apparat=in_apparat,
)
class COinSData:
def __init__(self) -> None:
pass
def transform(self, data: str) -> BookData:
def _get_line(source: str, search: str) -> str:
try:
data = source.split(f"{search}=")[1] # .split("")[0].strip()
return data.split("rft")[0].strip() if "rft" in data else data
except:
return ""
return BookData(
ppn=_get_line(data, "rft_id").split("=")[1],
title=_get_line(data, "rft.btitle"),
author=f"{_get_line(data, 'rft.aulast')}, {_get_line(data, 'rft.aufirst')}",
edition=_get_line(data, "rft.edition"),
link=_get_line(data, "rft_id"),
isbn=_get_line(data, "rft.isbn"),
publisher=_get_line(data, "rft.pub"),
year=_get_line(data, "rft.date"),
pages=_get_line(data, "rft.tpages").split(":")[0].strip(),
)
class RISData:
def __init__(self) -> None:
pass
def transform(self, data: str) -> BookData:
def _get_line(source: str, search: str) -> str:
try:
data = source.split(f"{search} - ")[1] # .split("")[0].strip()
return data.split("\n")[0].strip() if "\n" in data else data
except:
return ""
return BookData(
ppn=_get_line(data, "DP").split("=")[1],
title=_get_line(data, "TI"),
signature=_get_line(data, "CN"),
edition=_get_line(data, "ET").replace(",", ""),
link=_get_line(data, "DP"),
isbn=_get_line(data, "SN").split(","),
author=_get_line(data, "AU").split("[")[0].strip(),
language=_get_line(data, "LA"),
publisher=_get_line(data, "PB"),
year=_get_line(data, "PY"),
pages=_get_line(data, "SP"),
)
class BibTeXData:
def __init__(self):
pass
def transform(self, data: str) -> BookData:
def _get_line(source: str, search: str) -> str:
try:
return (
data.split(search)[1]
.split("\n")[0]
.strip()
.split("=")[1]
.strip()
.replace("{", "")
.replace("}", "")
.replace(",", "")
.replace("[", "")
.replace("];", "")
)
except:
return ""
return BookData(
ppn=None,
title=_get_line(data, "title"),
signature=_get_line(data, "bestand"),
edition=_get_line(data, "edition"),
isbn=_get_line(data, "isbn"),
author=";".join(_get_line(data, "author").split(" and ")),
language=_get_line(data, "language"),
publisher=_get_line(data, "publisher"),
year=_get_line(data, "year"),
pages=_get_line(data, "pages"),
)
class RDSData:
def __init__(self):
self.retlist = []
def transform(self, data: str):
# rds_availability = RDS_AVAIL_DATA()
# rds_data = RDS_GENERIC_DATA()
print(data)
def __get_raw_data(data: str) -> list:
# create base data to be turned into pydantic classes
data = data.split("RDS ----------------------------------")[1]
edata = data.strip()
edata = edata.split("\n", 9)[9]
edata = edata.split("\n")[1:]
entry_1 = edata[0]
edata = edata[1:]
entry_2 = "".join(edata)
edata = []
edata.append(entry_1)
edata.append(entry_2)
return edata
ret_data = __get_raw_data(data)
# assign data[1] to RDS_AVAIL_DATA
# assign data[0] to RDS_DATA
self.rds_data = RDS_GENERIC_DATA().import_from_dict(ret_data[1])
self.rds_availability = RDS_AVAIL_DATA().import_from_dict(ret_data[0])
self.retlist.append(self.rds_availability)
self.retlist.append(self.rds_data)
return self
def return_data(self, option=None):
if option == "rds_availability":
return self.retlist[0]
if option == "rds_data":
return self.retlist[1]
return {"rds_availability": self.retlist[0], "rds_data": self.retlist[1]}
class DictToTable:
def __init__(self):
self.work_author = None
self.section_author = None
self.year = None
self.edition = None
self.work_title = None
self.chapter_title = None
self.location = None
self.publisher = None
self.signature = None
self.type = None
self.pages = None
self.issue = None
self.isbn = None
def makeResult(self):
data = {
"work_author": self.work_author,
"section_author": self.section_author,
"year": self.year,
"edition": self.edition,
"work_title": self.work_title,
"chapter_title": self.chapter_title,
"location": self.location,
"publisher": self.publisher,
"signature": self.signature,
"issue": self.issue,
"pages": self.pages,
"isbn": self.isbn,
"type": self.type,
}
data = {k: v for k, v in data.items() if v is not None}
return data
def reset(self):
for key in self.__dict__:
setattr(self, key, None)
def transform(self, data: dict):
mode = data["mode"]
self.reset()
if mode == "book":
return self.book_assign(data)
if mode == "hg":
return self.hg_assign(data)
if mode == "zs":
return self.zs_assign(data)
return None
def book_assign(self, data):
self.type = "book"
self.work_author = data["book_author"]
self.signature = data["book_signature"]
self.location = data["book_place"]
self.year = data["book_year"]
self.work_title = data["book_title"]
self.edition = data["book_edition"]
self.pages = data["book_pages"]
self.publisher = data["book_publisher"]
self.isbn = data["book_isbn"]
return self.makeResult()
def hg_assign(self, data):
self.type = "hg"
self.section_author = data["hg_author"]
self.work_author = data["hg_editor"]
self.year = data["hg_year"]
self.work_title = data["hg_title"]
self.publisher = data["hg_publisher"]
self.location = data["hg_place"]
self.edition = data["hg_edition"]
self.chapter_title = data["hg_chaptertitle"]
self.pages = data["hg_pages"]
self.signature = data["hg_signature"]
self.isbn = data["hg_isbn"]
return self.makeResult()
def zs_assign(self, data):
self.type = "zs"
self.section_author = data["zs_author"]
self.chapter_title = data["zs_chapter_title"]
self.location = data["zs_place"]
self.issue = data["zs_issue"]
self.pages = data["zs_pages"]
self.publisher = data["zs_publisher"]
self.isbn = data["zs_isbn"]
self.year = data["zs_year"]
self.signature = data["zs_signature"]
self.work_title = data["zs_title"]
return self.makeResult()

View File

@@ -1,5 +1,3 @@
from typing import List
import regex
import requests
from bs4 import BeautifulSoup
@@ -33,11 +31,11 @@ class Catalogue:
response = requests.get(link, timeout=self.timeout)
return response.text
def get_book_links(self, searchterm: str) -> List[str]:
def get_book_links(self, searchterm: str) -> list[str]:
response = self.search_book(searchterm)
soup = BeautifulSoup(response, "html.parser")
links = soup.find_all("a", class_="title getFull")
res: List[str] = []
res: list[str] = []
for link in links:
res.append(BASE + link["href"]) # type: ignore
return res
@@ -186,7 +184,8 @@ class Catalogue:
class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel",
).get_text(strip=True)
book.isbn = isbn
# from div col-xs-12 col-md-5 col-lg-4 rds-dl-head RDS_SCOPE get pages (second div in this div)
# from div col-xs-12 col-md-5 col-lg-4 rds-dl-head
# RDS_SCOPE get pages (second div in this div)
pages = None
pages_el = soup.find("div", class_="RDS_SCOPE")
if pages_el:
@@ -206,14 +205,14 @@ class Catalogue:
# based on PPN, get title, people, edition, year, language, pages, isbn,
link = f"https://rds.ibs-bw.de/phfreiburg/opac/RDSIndexrecord/{ppn}"
result = self.search(link)
soup = BeautifulSoup(result, "html.parser")
BeautifulSoup(result, "html.parser")
def get_ppn(self, searchterm: str) -> str | None:
links = self.get_book_links(searchterm)
ppn = None
for link in links:
result = self.search(link)
soup = BeautifulSoup(result, "html.parser")
BeautifulSoup(result, "html.parser")
ppn = link.split("/")[-1]
if ppn and regex.match(r"^\d{8,10}[X\d]?$", ppn):
return ppn

View File

@@ -1,6 +1,6 @@
import json
from dataclasses import dataclass, field
from typing import Any, Optional, Union
from typing import Any
import regex
@@ -12,9 +12,9 @@ class BookData:
signature: str | None = None
edition: str | None = None
link: str | None = None
isbn: Union[str, list[str], None] = field(default_factory=list[str])
isbn: str | list[str] | None = field(default_factory=list[str])
author: str | None = None
language: Union[str, list[str], None] = field(default_factory=list)
language: str | list[str] | None = field(default_factory=list)
publisher: str | None = None
place: str | None = None
year: int | None = None
@@ -23,9 +23,10 @@ class BookData:
in_apparat: bool | None = False
adis_idn: str | None = None
old_book: Any | None = None
media_type: str | None = None #
media_type: str | None = None
in_library: bool | None = None # whether the book is in the library or not
libraries: list[str] | None = field(default_factory=list)
medianr: int | None = None # media number
def __post_init__(self):
self.library_location = (
@@ -72,11 +73,10 @@ class BookData:
key: value for key, value in self.__dict__.items() if value is not None
}
# remove old_book from data_dict
if "old_book" in data_dict:
del data_dict["old_book"]
data_dict.pop("old_book", None)
return json.dumps(data_dict, ensure_ascii=False)
def from_dataclass(self, dataclass: Optional[Any]) -> None:
def from_dataclass(self, dataclass: Any | None) -> None:
if dataclass is None:
return
for key, value in dataclass.__dict__.items():
@@ -86,8 +86,7 @@ class BookData:
if isinstance(self.media_type, str):
if "Online" in self.pages:
return "eBook"
else:
return "Druckausgabe"
return "Druckausgabe"
return None
def from_string(self, data: str) -> "BookData":
@@ -114,7 +113,7 @@ class BookData:
return self
@property
def edition_number(self) -> Optional[int]:
def edition_number(self) -> int | None:
if self.edition is None:
return 0
match = regex.search(r"(\d+)", self.edition)

View File

@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import List, Optional
from typing import list
# --- MARC XML structures ---
@@ -20,14 +20,14 @@ class DataField:
tag: str
ind1: str = " "
ind2: str = " "
subfields: List[SubField] = field(default_factory=list)
subfields: list[SubField] = field(default_factory=list)
@dataclass
class MarcRecord:
leader: str
controlfields: List[ControlField] = field(default_factory=list)
datafields: List[DataField] = field(default_factory=list)
controlfields: list[ControlField] = field(default_factory=list)
datafields: list[DataField] = field(default_factory=list)
# --- SRU record wrapper ---
@@ -52,17 +52,17 @@ class EchoedSearchRequest:
class SearchRetrieveResponse:
version: str
numberOfRecords: int
records: List[Record] = field(default_factory=list)
echoedSearchRetrieveRequest: Optional[EchoedSearchRequest] = None
records: list[Record] = field(default_factory=list)
echoedSearchRetrieveRequest: EchoedSearchRequest | None = None
@dataclass
class FormattedResponse:
title: str
edition: Optional[str] = None
publisher: Optional[str] = None
year: Optional[str] = None
authors: List[str] = field(default_factory=list)
isbn: List[str] = field(default_factory=list)
ppn: Optional[str] = None
libraries: List[str] = field(default_factory=list)
edition: str | None = None
publisher: str | None = None
year: str | None = None
authors: list[str] = field(default_factory=list)
isbn: list[str] = field(default_factory=list)
ppn: str | None = None
libraries: list[str] = field(default_factory=list)

View File

@@ -1,8 +1,9 @@
import re
import time
import xml.etree.ElementTree as ET
from collections.abc import Iterable
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
from typing import Any
import requests
from requests.adapters import HTTPAdapter
@@ -24,7 +25,7 @@ MARC = "http://www.loc.gov/MARC21/slim"
NS = {"zs": ZS, "marc": MARC}
def _text(elem: Optional[ET.Element]) -> str:
def _text(elem: ET.Element | None) -> str:
return (elem.text or "") if elem is not None else ""
@@ -36,32 +37,31 @@ def _req_text(parent: ET.Element, path: str) -> str:
def parse_marc_record(record_el: ET.Element) -> MarcRecord:
"""
record_el is the <marc:record> element (default ns MARC in your sample)
"""record_el is the <marc:record> element (default ns MARC in your sample)
"""
# leader
leader_text = _req_text(record_el, "marc:leader")
# controlfields
controlfields: List[ControlField] = []
controlfields: list[ControlField] = []
for cf in record_el.findall("marc:controlfield", NS):
tag = cf.get("tag", "").strip()
controlfields.append(ControlField(tag=tag, value=_text(cf)))
# datafields
datafields: List[DataField] = []
datafields: list[DataField] = []
for df in record_el.findall("marc:datafield", NS):
tag = df.get("tag", "").strip()
ind1 = df.get("ind1") or " "
ind2 = df.get("ind2") or " "
subfields: List[SubField] = []
subfields: list[SubField] = []
for sf in df.findall("marc:subfield", NS):
code = sf.get("code", "")
subfields.append(SubField(code=code, value=_text(sf)))
datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields))
return MarcRecord(
leader=leader_text, controlfields=controlfields, datafields=datafields
leader=leader_text, controlfields=controlfields, datafields=datafields,
)
@@ -92,7 +92,7 @@ def parse_record(zs_record_el: ET.Element) -> Record:
)
def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]:
def parse_echoed_request(root: ET.Element) -> EchoedSearchRequest | None:
el = root.find("zs:echoedSearchRetrieveRequest", NS)
if el is None:
return None
@@ -119,7 +119,7 @@ def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]:
def parse_search_retrieve_response(
xml_str: Union[str, bytes],
xml_str: str | bytes,
) -> SearchRetrieveResponse:
root = ET.fromstring(xml_str)
@@ -128,7 +128,7 @@ def parse_search_retrieve_response(
numberOfRecords = int(_req_text(root, "zs:numberOfRecords") or "0")
records_parent = root.find("zs:records", NS)
records: List[Record] = []
records: list[Record] = []
if records_parent is not None:
for r in records_parent.findall("zs:record", NS):
record = parse_record(r)
@@ -150,9 +150,9 @@ def parse_search_retrieve_response(
def iter_datafields(
rec: MarcRecord,
tag: Optional[str] = None,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
tag: str | None = None,
ind1: str | None = None,
ind2: str | None = None,
) -> Iterable[DataField]:
"""Yield datafields, optionally filtered by tag/indicators."""
for df in rec.datafields:
@@ -170,11 +170,11 @@ def subfield_values(
tag: str,
code: str,
*,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
) -> List[str]:
ind1: str | None = None,
ind2: str | None = None,
) -> list[str]:
"""All values for subfield `code` in every `tag` field (respecting indicators)."""
out: List[str] = []
out: list[str] = []
for df in iter_datafields(rec, tag, ind1, ind2):
out.extend(sf.value for sf in df.subfields if sf.code == code)
return out
@@ -185,10 +185,10 @@ def first_subfield_value(
tag: str,
code: str,
*,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
default: Optional[str] = None,
) -> Optional[str]:
ind1: str | None = None,
ind2: str | None = None,
default: str | None = None,
) -> str | None:
"""First value for subfield `code` in `tag` (respecting indicators)."""
for df in iter_datafields(rec, tag, ind1, ind2):
for sf in df.subfields:
@@ -201,25 +201,24 @@ def find_datafields_with_subfields(
rec: MarcRecord,
tag: str,
*,
where_all: Optional[Dict[str, str]] = None,
where_any: Optional[Dict[str, str]] = None,
where_all: dict[str, str] | None = None,
where_any: dict[str, str] | None = None,
casefold: bool = False,
ind1: Optional[str] = None,
ind2: Optional[str] = None,
) -> List[DataField]:
"""
Return datafields of `tag` whose subfields match constraints:
ind1: str | None = None,
ind2: str | None = None,
) -> list[DataField]:
"""Return datafields of `tag` whose subfields match constraints:
- where_all: every (code -> exact value) must be present
- where_any: at least one (code -> exact value) present
Set `casefold=True` for case-insensitive comparison.
"""
where_all = where_all or {}
where_any = where_any or {}
matched: List[DataField] = []
matched: list[DataField] = []
for df in iter_datafields(rec, tag, ind1, ind2):
# Map code -> list of values (with optional casefold applied)
vals: Dict[str, List[str]] = {}
vals: dict[str, list[str]] = {}
for sf in df.subfields:
v = sf.value.casefold() if casefold else sf.value
vals.setdefault(sf.code, []).append(v)
@@ -246,8 +245,8 @@ def find_datafields_with_subfields(
def controlfield_value(
rec: MarcRecord, tag: str, default: Optional[str] = None
) -> Optional[str]:
rec: MarcRecord, tag: str, default: str | None = None,
) -> str | None:
"""Get the first controlfield value by tag (e.g., '001', '005')."""
for cf in rec.controlfields:
if cf.tag == tag:
@@ -256,8 +255,8 @@ def controlfield_value(
def datafields_value(
data: List[DataField], code: str, default: Optional[str] = None
) -> Optional[str]:
data: list[DataField], code: str, default: str | None = None,
) -> str | None:
"""Get the first value for a specific subfield code in a list of datafields."""
for df in data:
for sf in df.subfields:
@@ -267,8 +266,8 @@ def datafields_value(
def datafield_value(
df: DataField, code: str, default: Optional[str] = None
) -> Optional[str]:
df: DataField, code: str, default: str | None = None,
) -> str | None:
"""Get the first value for a specific subfield code in a datafield."""
for sf in df.subfields:
if sf.code == code:
@@ -276,9 +275,8 @@ def datafield_value(
return default
def _smart_join_title(a: str, b: Optional[str]) -> str:
"""
Join 245 $a and $b with MARC-style punctuation.
def _smart_join_title(a: str, b: str | None) -> str:
"""Join 245 $a and $b with MARC-style punctuation.
If $b is present, join with ' : ' unless either side already supplies punctuation.
"""
a = a.strip()
@@ -293,7 +291,7 @@ def _smart_join_title(a: str, b: Optional[str]) -> str:
def subfield_values_from_fields(
fields: Iterable[DataField],
code: str,
) -> List[str]:
) -> list[str]:
"""All subfield values with given `code` across a list of DataField."""
return [sf.value for df in fields for sf in df.subfields if sf.code == code]
@@ -301,8 +299,8 @@ def subfield_values_from_fields(
def first_subfield_value_from_fields(
fields: Iterable[DataField],
code: str,
default: Optional[str] = None,
) -> Optional[str]:
default: str | None = None,
) -> str | None:
"""First subfield value with given `code` across a list of DataField."""
for df in fields:
for sf in df.subfields:
@@ -314,12 +312,11 @@ def first_subfield_value_from_fields(
def subfield_value_pairs_from_fields(
fields: Iterable[DataField],
code: str,
) -> List[Tuple[DataField, str]]:
"""
Return (DataField, value) pairs for all subfields with `code`.
) -> list[tuple[DataField, str]]:
"""Return (DataField, value) pairs for all subfields with `code`.
Useful if you need to know which field a value came from.
"""
out: List[Tuple[DataField, str]] = []
out: list[tuple[DataField, str]] = []
for df in fields:
for sf in df.subfields:
if sf.code == code:
@@ -340,13 +337,13 @@ def book_from_marc(rec: MarcRecord, library_identifier: str) -> BookData:
# Signature = 924 where $9 == "Frei 129" → take that field's $g
frei_fields = find_datafields_with_subfields(
rec, "924", where_all={"9": "Frei 129"}
rec, "924", where_all={"9": "Frei 129"},
)
signature = first_subfield_value_from_fields(frei_fields, "g")
# Year = 264 $c (prefer ind2="1" publication; fallback to any 264)
year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value(
rec, "264", "c"
rec, "264", "c",
)
isbn = subfield_values(rec, "020", "a")
mediatype = first_subfield_value(rec, "338", "a")
@@ -378,10 +375,9 @@ RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK no
def find_newer_edition(
swb_result: BookData, dnb_result: List[BookData]
) -> Optional[List[BookData]]:
"""
New edition if:
swb_result: BookData, dnb_result: list[BookData],
) -> list[BookData] | None:
"""New edition if:
- year > swb.year OR
- edition_number > swb.edition_number
@@ -393,7 +389,7 @@ def find_newer_edition(
edition_number desc, best-signature-match desc, has-signature desc).
"""
def norm_sig(s: Optional[str]) -> str:
def norm_sig(s: str | None) -> str:
if not s:
return ""
# normalize: lowercase, collapse whitespace, keep alnum + a few separators
@@ -427,7 +423,7 @@ def find_newer_edition(
swb_sig_norm = norm_sig(getattr(swb_result, "signature", None))
# 1) Filter to same-work AND newer
candidates: List[BookData] = []
candidates: list[BookData] = []
for b in dnb_result:
# Skip if both signatures exist and don't match (different work)
b_sig = getattr(b, "signature", None)
@@ -443,7 +439,7 @@ def find_newer_edition(
return None
# 2) Dedupe by PPN, preferring signature (and matching signature if possible)
by_ppn: dict[Optional[str], BookData] = {}
by_ppn: dict[str | None, BookData] = {}
for b in candidates:
key = getattr(b, "ppn", None)
prev = by_ppn.get(key)
@@ -477,7 +473,7 @@ def find_newer_edition(
class QueryTransformer:
def __init__(self, api_schema: Type[Enum], arguments: Union[Iterable[str], str]):
def __init__(self, api_schema: type[Enum], arguments: Iterable[str] | str):
self.api_schema = api_schema
if isinstance(arguments, str):
self.arguments = [arguments]
@@ -485,8 +481,8 @@ class QueryTransformer:
self.arguments = arguments
self.drop_empty = True
def transform(self) -> Dict[str, Any]:
arguments: List[str] = []
def transform(self) -> dict[str, Any]:
arguments: list[str] = []
schema = self.api_schema
for arg in self.arguments:
if "=" not in arg:
@@ -497,16 +493,16 @@ class QueryTransformer:
if hasattr(schema, key.upper()):
api_key = getattr(schema, key.upper()).value
if key.upper() == "AUTHOR" and hasattr(schema, "AUTHOR_SCHEMA"):
author_schema = getattr(schema, "AUTHOR_SCHEMA").value
author_schema = schema.AUTHOR_SCHEMA.value
if author_schema == "SpaceAfterComma":
value = value.replace(",", ", ")
elif author_schema == "NoSpaceAfterComma":
value = value.replace(", ", ",")
value = value.replace(" ", " ")
if key.upper() == "TITLE" and hasattr(
schema, "ENCLOSE_TITLE_IN_QUOTES"
schema, "ENCLOSE_TITLE_IN_QUOTES",
):
if getattr(schema, "ENCLOSE_TITLE_IN_QUOTES"):
if schema.ENCLOSE_TITLE_IN_QUOTES:
value = f'"{value}"'
arguments.append(f"{api_key}={value}")
@@ -519,10 +515,10 @@ class Api:
self,
site: str,
url: str,
prefix: Type[Enum],
prefix: type[Enum],
library_identifier: str,
notsupported_args: Optional[List[str]] = None,
replace: Optional[Dict[str, str]] = None,
notsupported_args: list[str] | None = None,
replace: dict[str, str] | None = None,
):
self.site = site
self.url = url
@@ -554,7 +550,7 @@ class Api:
# Best-effort cleanup
self.close()
def get(self, query_args: Union[Iterable[str], str]) -> List[Record]:
def get(self, query_args: Iterable[str] | str) -> list[Record]:
start_time = time.monotonic()
# if any query_arg ends with =, remove it
if isinstance(query_args, str):
@@ -566,7 +562,7 @@ class Api:
if not any(qa.startswith(na + "=") for na in self.notsupported_args)
]
query_args = QueryTransformer(
api_schema=self.prefix, arguments=query_args
api_schema=self.prefix, arguments=query_args,
).transform()
query = "+and+".join(query_args)
for old, new in self.replace.items():
@@ -579,12 +575,12 @@ class Api:
"Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3",
}
# Use persistent session, enforce 1 req/sec, and retry up to 5 times
last_error: Optional[Exception] = None
last_error: Exception | None = None
for attempt in range(1, self._max_retries + 1):
# Abort if overall timeout exceeded before starting attempt
if time.monotonic() - start_time > self._overall_timeout_seconds:
last_error = requests.exceptions.Timeout(
f"Overall timeout {self._overall_timeout_seconds}s exceeded before attempt {attempt}"
f"Overall timeout {self._overall_timeout_seconds}s exceeded before attempt {attempt}",
)
break
# Enforce rate limit relative to last request end
@@ -596,21 +592,20 @@ class Api:
try:
# Per-attempt read timeout capped at remaining overall budget (but at most 30s)
remaining = max(
0.0, self._overall_timeout_seconds - (time.monotonic() - start_time)
0.0, self._overall_timeout_seconds - (time.monotonic() - start_time),
)
read_timeout = min(30.0, remaining if remaining > 0 else 0.001)
resp = self._session.get(
url, headers=headers, timeout=(3.05, read_timeout)
url, headers=headers, timeout=(3.05, read_timeout),
)
self._last_request_time = time.monotonic()
if resp.status_code == 200:
# Parse using raw bytes (original behavior) to preserve encoding edge cases
sr = parse_search_retrieve_response(resp.content)
return sr.records
else:
last_error = Exception(
f"Error fetching data from {self.site}: HTTP {resp.status_code} (attempt {attempt}/{self._max_retries})"
)
last_error = Exception(
f"Error fetching data from {self.site}: HTTP {resp.status_code} (attempt {attempt}/{self._max_retries})",
)
except requests.exceptions.ReadTimeout as e:
last_error = e
except requests.exceptions.Timeout as e:
@@ -625,9 +620,9 @@ class Api:
# If we exit the loop, all attempts failed
raise last_error if last_error else Exception("Unknown request failure")
def getBooks(self, query_args: Union[Iterable[str], str]) -> List[BookData]:
def getBooks(self, query_args: Iterable[str] | str) -> list[BookData]:
try:
records: List[Record] = self.get(query_args)
records: list[Record] = self.get(query_args)
except requests.exceptions.ReadTimeout:
# Return a list with a single empty BookData object on read timeout
return [BookData()]
@@ -638,7 +633,7 @@ class Api:
# Propagate other errors (could also choose to return empty list)
raise
# Avoid printing on hot paths; rely on logger if needed
books: List[BookData] = []
books: list[BookData] = []
# extract title from query_args if present
title = None
for arg in query_args:

296
src/bibapi/webrequest.py Normal file
View File

@@ -0,0 +1,296 @@
from enum import Enum
from typing import Any
import requests
from bs4 import BeautifulSoup
# import sleep_and_retry decorator to retry requests
from ratelimit import limits, sleep_and_retry
from src.bibapi._transformers import (
RDS_AVAIL_DATA,
RDS_GENERIC_DATA,
ARRAYData,
BibTeXData,
COinSData,
RDSData,
RISData,
)
from src.bibapi.schemas.bookdata import BookData
API_URL = "https://rds.ibs-bw.de/phfreiburg/opac/RDSIndexrecord/{}/"
PPN_URL = "https://rds.ibs-bw.de/phfreiburg/opac/RDSIndex/Search?type0%5B%5D=allfields&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=au&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ti&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ct&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=isn&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ta&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=co&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=py&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pp&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pu&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=si&lookfor0%5B%5D={}&join=AND&bool0%5B%5D=AND&type0%5B%5D=zr&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=cc&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND"
BASE = "https://rds.ibs-bw.de"
TITLE = "RDS_TITLE"
SIGNATURE = "RDS_SIGNATURE"
EDITION = "RDS_EDITION"
ISBN = "RDS_ISBN"
AUTHOR = "RDS_PERSON"
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 \
(HTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36",
"Accept-Language": "en-US, en;q=0.5",
}
RATE_LIMIT = 20
RATE_PERIOD = 30
class TransformerType(Enum):
"""Enum for possible Transformer types."""
ARRAY = "ARRAY"
COinS = "COinS"
BibTeX = "BibTeX"
RIS = "RIS"
RDS = "RDS"
class WebRequest:
def __init__(self) -> None:
"""Request data from the web, and format it depending on the mode."""
self.apparat = None
self.use_any = False # use any book that matches the search term
self.signature = None
self.ppn = None
self.data = None
self.timeout = 5
self.public_ip = None
self.canrun()
def canrun(self) -> None:
"""Check if requests can be made."""
try:
#check public IP to see if the requested data can be accessed
ip_response = requests.get("https://api.ipify.org", timeout=self.timeout)
ip_response.raise_for_status()
self.public_ip = ip_response.text
except requests.exceptions.RequestException as e:
raise ConnectionError("No internet connection") from e
if self.public_ip is None:
raise ConnectionError("No internet connection")
@property
def use_any_book(self):
"""Use any book that matches the search term"""
self.use_any = True
return self
def set_apparat(self, apparat: int) -> "WebRequest":
self.apparat = apparat
if int(self.apparat) < 10:
self.apparat = f"0{self.apparat}"
return self
def get_ppn(self, signature: str) -> "WebRequest":
self.signature = signature
if "+" in signature:
signature = signature.replace("+", "%2B")
if "doi.org" in signature:
signature = signature.split("/")[-1]
self.ppn = signature
return self
@sleep_and_retry
@limits(calls=RATE_LIMIT, period=RATE_PERIOD)
def search_book(self, searchterm: str) -> str:
response = requests.get(PPN_URL.format(searchterm), timeout=self.timeout)
return response.text
@sleep_and_retry
@limits(calls=RATE_LIMIT, period=RATE_PERIOD)
def search_ppn(self, ppn: str) -> str:
response = requests.get(API_URL.format(ppn), timeout=self.timeout)
return response.text
def get_book_links(self, searchterm: str) -> list[str]:
response: str = self.search_book(searchterm) # type:ignore
soup = BeautifulSoup(response, "html.parser")
links = soup.find_all("a", class_="title getFull")
res: list[str] = []
for link in links:
res.append(BASE + link["href"])
return res
@sleep_and_retry
@limits(calls=RATE_LIMIT, period=RATE_PERIOD)
def search(self, link: str) -> str | None:
try:
response = requests.get(link, timeout=self.timeout)
return response.text
except requests.exceptions.RequestException:
return None
def get_data(self) -> list[str] | None:
links = self.get_book_links(self.ppn)
log.debug(f"Links: {links}")
return_data: list[str] = []
for link in links:
result: str = self.search(link) # type:ignore
# in result search for class col-xs-12 rds-dl RDS_LOCATION
# if found, return text of href
soup = BeautifulSoup(result, "html.parser")
locations = soup.find_all("div", class_="col-xs-12 rds-dl RDS_LOCATION")
if locations:
for location in locations:
if "1. OG Semesterapparat" in location.text:
pre_tag = soup.find_all("pre")
return_data = []
if pre_tag:
for tag in pre_tag:
data = tag.text.strip()
return_data.append(data)
return return_data
return return_data
item_location = location.find(
"div", class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel",
).text.strip()
if self.use_any:
pre_tag = soup.find_all("pre")
if pre_tag:
for tag in pre_tag:
data = tag.text.strip()
return_data.append(data)
return return_data
raise ValueError("No <pre> tag found")
if f"Semesterapparat-{self.apparat}" in item_location:
pre_tag = soup.find_all("pre")
return_data = []
if pre_tag:
for tag in pre_tag:
data = tag.text.strip()
return_data.append(data)
return return_data
return return_data
return return_data
def get_data_elsa(self) -> list[str] | None:
links = self.get_book_links(self.ppn)
for link in links:
result = self.search(link)
# in result search for class col-xs-12 rds-dl RDS_LOCATION
# if found, return text of href
soup = BeautifulSoup(result, "html.parser")
locations = soup.find_all("div", class_="col-xs-12 rds-dl RDS_LOCATION")
if locations:
for _ in locations:
pre_tag = soup.find_all("pre")
return_data = []
if pre_tag:
for tag in pre_tag:
data = tag.text.strip()
return_data.append(data)
return return_data
return None
class BibTextTransformer:
"""Transforms data from the web into a BibText format.
Valid Modes are ARRAY, COinS, BibTeX, RIS, RDS
Raises:
ValueError: Raised if mode is not in valid_modes
"""
valid_modes = [
TransformerType.ARRAY,
TransformerType.COinS,
TransformerType.BibTeX,
TransformerType.RIS,
TransformerType.RDS,
]
def __init__(self, mode: TransformerType = TransformerType.ARRAY) -> None:
self.mode = mode.value
self.field = None
self.signature = None
if mode not in self.valid_modes:
raise ValueError(f"Mode {mode} not valid")
self.data = None
# self.bookdata = BookData(**self.data)
def use_signature(self, signature: str) -> "BibTextTransformer":
"""Use the exact signature to search for the book"""
self.signature = signature
return self
def get_data(self, data: list[str] | None = None) -> "BibTextTransformer":
RIS_IDENT = "TY -"
ARRAY_IDENT = "[kid]"
COinS_IDENT = "ctx_ver"
BIBTEX_IDENT = "@book"
RDS_IDENT = "RDS ---------------------------------- "
if data is None:
self.data = None
return self
if self.mode == "RIS":
for line in data:
if RIS_IDENT in line:
self.data = line
elif self.mode == "ARRAY":
for line in data:
if ARRAY_IDENT in line:
self.data = line
elif self.mode == "COinS":
for line in data:
if COinS_IDENT in line:
self.data = line
elif self.mode == "BibTeX":
for line in data:
if BIBTEX_IDENT in line:
self.data = line
elif self.mode == "RDS":
for line in data:
if RDS_IDENT in line:
self.data = line
return self
def return_data(
self, option: Any = None,
) -> BookData | None | RDS_GENERIC_DATA | RDS_AVAIL_DATA | dict[str, RDS_AVAIL_DATA | RDS_GENERIC_DATA]:
"""Return Data to caller.
Args:
option (string, optional): Option for RDS as there are two filetypes. Use rds_availability or rds_data. Anything else gives a dict of both responses. Defaults to None.
Returns:
BookData: a dataclass containing data about the book
"""
if self.data is None:
return None
match self.mode:
case "ARRAY":
return ARRAYData(self.signature).transform(self.data)
case "COinS":
return COinSData().transform(self.data)
case "BibTeX":
return BibTeXData().transform(self.data)
case "RIS":
return RISData().transform(self.data)
case "RDS":
return RDSData().transform(self.data).return_data(option)
case _:
return None
def cover(isbn):
test_url = f"https://www.buchhandel.de/cover/{isbn}/{isbn}-cover-m.jpg"
data = requests.get(test_url, stream=True)
return data.content
def get_content(soup, css_class):
return soup.find("div", class_=css_class).text.strip()
if __name__ == "__main__":
link = "CU 8500 K64"
data = WebRequest(71).get_ppn(link).get_data()
bib = BibTextTransformer("ARRAY").get_data().return_data()

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Tests for the package."""

View File

@@ -1,108 +0,0 @@
from typing import Callable, Optional
import pytest
from bibapi import sru
@pytest.fixture
def sample_sru_xml() -> bytes:
"""Return a small SRU searchRetrieveResponse (MARCXML) as bytes.
Tests can use this raw bytes payload to simulate SRU responses.
"""
xml = b"""<?xml version="1.0" encoding="UTF-8"?>
<zs:searchRetrieveResponse xmlns:zs="http://www.loc.gov/zing/srw/"
xmlns:marc="http://www.loc.gov/MARC21/slim">
<zs:version>1.1</zs:version>
<zs:numberOfRecords>1</zs:numberOfRecords>
<zs:records>
<zs:record>
<zs:recordSchema>marcxml</zs:recordSchema>
<zs:recordPacking>xml</zs:recordPacking>
<zs:recordData>
<marc:record>
<marc:leader>-----nam a22</marc:leader>
<marc:controlfield tag="001">PPN123</marc:controlfield>
<marc:datafield tag="245" ind1=" " ind2=" ">
<marc:subfield code="a">Example Title</marc:subfield>
<marc:subfield code="b">Subtitle</marc:subfield>
</marc:datafield>
<marc:datafield tag="264" ind1=" " ind2="1">
<marc:subfield code="c">2001</marc:subfield>
<marc:subfield code="b">Example Publisher</marc:subfield>
</marc:datafield>
</marc:record>
</zs:recordData>
<zs:recordPosition>1</zs:recordPosition>
</zs:record>
</zs:records>
<zs:echoedSearchRetrieveRequest>
<zs:version>1.1</zs:version>
<zs:query>pica.tit=Example</zs:query>
<zs:maximumRecords>10</zs:maximumRecords>
<zs:recordPacking>xml</zs:recordPacking>
<zs:recordSchema>marcxml</zs:recordSchema>
</zs:echoedSearchRetrieveRequest>
</zs:searchRetrieveResponse>
"""
return xml
@pytest.fixture
def sru_api_factory(monkeypatch) -> Callable[[str, Optional[bytes]], sru.Api]:
"""Factory to create an `sru.Api` (or subclass) with network calls mocked.
Usage:
def test_x(sru_api_factory, sample_sru_xml):
api = sru_api_factory('SWB', sample_sru_xml)
books = api.getBooks(['pica.tit=Example'])
The fixture monkeypatches requests.Session.get on the Api instance to return
a fake Response with the provided bytes payload. If `response_bytes` is
None the real network call will be performed (not recommended in unit tests).
"""
def _make(site: str, response_bytes: Optional[bytes] = None) -> sru.Api:
mapping = {"SWB": sru.SWB, "DNB": sru.Api}
if site == "SWB":
api = sru.SWB()
elif site == "DNB":
# DNB Api class is the base Api configured differently in sru module
api = sru.Api(
sru.DNBData.NAME.value,
sru.DNBData.URL.value,
sru.DNBData.ARGSCHEMA.value,
)
else:
# allow custom site/url/prefix via tuple passed as site: (site, url, prefix)
if isinstance(site, tuple) and len(site) == 3:
api = sru.Api(site[0], site[1], site[2])
else:
raise ValueError("Unknown site for factory: %r" % (site,))
if response_bytes is not None:
class FakeResp:
status_code = 200
def __init__(self, content: bytes):
self.content = content
def fake_get(url, headers=None, timeout=None):
return FakeResp(response_bytes)
# Patch only this instance's session.get
monkeypatch.setattr(api._session, "get", fake_get)
return api
return _make
import pytest
@pytest.fixture
def sru_data():
return {"bib_id": 20735, "sigil": "Frei129"}

View File

@@ -1,8 +1,7 @@
from src.bibapi.sru import SWB
from src.bibapi import SWB
def test_swb_schema():
def test_swb_schema() -> None:
result = SWB().getBooks(["pica.tit=Java ist auch eine Insel", "pica.bib=20735"])
assert len(result) == 1
assert result[0].title == "Java ist auch eine Insel"
assert