diff --git a/src/bibapi/__init__.py b/src/bibapi/__init__.py index 7f3f30a..1978e7f 100644 --- a/src/bibapi/__init__.py +++ b/src/bibapi/__init__.py @@ -1,18 +1,25 @@ -from .schemas.api_types import * +from .schemas.api_types import ( + DNBSchema, + HBZSchema, + HebisSchema, + KOBVSchema, + OEVKSchema, + SWBSchema, +) from .sru import Api as _Api __all__ = [ - "SWB", "DNB", - "KOBV", - "HEBIS", - "OEVK", "HBZ", + "HEBIS", + "KOBV", + "OEVK", + "SWB", ] class SWB(_Api): - def __init__(self): + def __init__(self) -> None: self.site = SWBSchema.NAME.value self.url = SWBSchema.URL.value self.prefix = SWBSchema.ARGSCHEMA.value @@ -21,7 +28,7 @@ class SWB(_Api): class DNB(_Api): - def __init__(self): + def __init__(self) -> None: self.site = DNBSchema.NAME.value self.url = DNBSchema.URL.value self.prefix = DNBSchema.ARGSCHEMA.value @@ -29,7 +36,7 @@ class DNB(_Api): class KOBV(_Api): - def __init__(self): + def __init__(self) -> None: self.site = KOBVSchema.NAME.value self.url = KOBVSchema.URL.value self.prefix = KOBVSchema.ARGSCHEMA.value @@ -38,7 +45,7 @@ class KOBV(_Api): class HEBIS(_Api): - def __init__(self): + def __init__(self) -> None: self.site = HebisSchema.NAME.value self.url = HebisSchema.URL.value self.prefix = HebisSchema.ARGSCHEMA.value @@ -56,7 +63,7 @@ class HEBIS(_Api): class OEVK(_Api): - def __init__(self): + def __init__(self) -> None: self.site = OEVKSchema.NAME.value self.url = OEVKSchema.URL.value self.prefix = OEVKSchema.ARGSCHEMA.value @@ -65,20 +72,22 @@ class OEVK(_Api): class HBZ(_Api): - """ - Small wrapper of the SRU API used to retrieve data from the HBZ libraries + """Small wrapper of the SRU API used to retrieve data from the HBZ libraries. All fields are available [here](https://eu04.alma.exlibrisgroup.com/view/sru/49HBZ_NETWORK?version=1.2) Schema ------ - HBZSchema: + HBZSchema: "HBZSchema" + query prefix: alma. """ - def __init__(self): + def __init__(self) -> None: self.site = HBZSchema.NAME.value self.url = HBZSchema.URL.value self.prefix = HBZSchema.ARGSCHEMA.value self.library_identifier = HBZSchema.LIBRARY_NAME_LOCATION_FIELD.value super().__init__(self.site, self.url, self.prefix, self.library_identifier) + + diff --git a/src/bibapi/_transformers.py b/src/bibapi/_transformers.py new file mode 100644 index 0000000..6dc3558 --- /dev/null +++ b/src/bibapi/_transformers.py @@ -0,0 +1,502 @@ +from __future__ import annotations + +import json +import re +from dataclasses import dataclass +from dataclasses import field as dataclass_field +from typing import Any + +from src.bibapi.schemas.bookdata import BookData + + +@dataclass +class Item: + superlocation: str | None = dataclass_field(default_factory=str) + status: str | None = dataclass_field(default_factory=str) + availability: str | None = dataclass_field(default_factory=str) + notes: str | None = dataclass_field(default_factory=str) + limitation: str | None = dataclass_field(default_factory=str) + duedate: str | None = dataclass_field(default_factory=str) + id: str | None = dataclass_field(default_factory=str) + item_id: str | None = dataclass_field(default_factory=str) + ilslink: str | None = dataclass_field(default_factory=str) + number: int | None = dataclass_field(default_factory=int) + barcode: str | None = dataclass_field(default_factory=str) + reserve: str | None = dataclass_field(default_factory=str) + callnumber: str | None = dataclass_field(default_factory=str) + department: str | None = dataclass_field(default_factory=str) + locationhref: str | None = dataclass_field(default_factory=str) + location: str | None = dataclass_field(default_factory=str) + ktrl_nr: str | None = dataclass_field(default_factory=str) + + def from_dict(self, data: dict[str, Any]) -> Item: + """Import data from dict.""" + data = data["items"] + for entry in data: + for key, value in entry.items(): + setattr(self, key, value) + return self + + +@dataclass +class RDS_AVAIL_DATA: + """Class to store RDS availability data""" + + library_sigil: str = dataclass_field(default_factory=str) + items: List[Item] = dataclass_field(default_factory=list) + + def import_from_dict(self, data: str): + """Import data from dict""" + edata = json.loads(data) + # library sigil is first key + + self.library_sigil = str(list(edata.keys())[0]) + # get data from first key + edata = edata[self.library_sigil] + for location in edata: + item = Item(superlocation=location).from_dict(edata[location]) + + self.items.append(item) + return self + + +@dataclass +class RDS_DATA: + """Class to store RDS data""" + + RDS_SIGNATURE: str = dataclass_field(default_factory=str) + RDS_STATUS: str = dataclass_field(default_factory=str) + RDS_LOCATION: str = dataclass_field(default_factory=str) + RDS_URL: Any = dataclass_field(default_factory=str) + RDS_HINT: Any = dataclass_field(default_factory=str) + RDS_COMMENT: Any = dataclass_field(default_factory=str) + RDS_HOLDING: Any = dataclass_field(default_factory=str) + RDS_HOLDING_LEAK: Any = dataclass_field(default_factory=str) + RDS_INTERN: Any = dataclass_field(default_factory=str) + RDS_PROVENIENCE: Any = dataclass_field(default_factory=str) + RDS_LOCAL_NOTATION: str = dataclass_field(default_factory=str) + RDS_LEA: Any = dataclass_field(default_factory=str) + + def import_from_dict(self, data: dict) -> RDS_DATA: + """Import data from dict""" + for key, value in data.items(): + setattr(self, key, value) + return self + + +@dataclass +class RDS_GENERIC_DATA: + LibrarySigil: str = dataclass_field(default_factory=str) + RDS_DATA: List[RDS_DATA] = dataclass_field(default_factory=list) + + def import_from_dict(self, data: str) -> RDS_GENERIC_DATA: + """Import data from dict""" + edata = json.loads(data) + # library sigil is first key + self.LibrarySigil = str(list(edata.keys())[0]) + # get data from first key + edata = edata[self.LibrarySigil] + for entry in edata: + rds_data = RDS_DATA() # Create a new RDS_DATA instance + # Populate the RDS_DATA instance from the entry + # This assumes that the entry is a dictionary that matches the structure of the RDS_DATA class + rds_data.import_from_dict(entry) + self.RDS_DATA.append(rds_data) # Add the RDS_DATA instance to the list + return self + + +class BaseStruct: + def __init__(self, **kwargs): + for key, value in kwargs.items(): + setattr(self, key, value) + + +class ARRAYData: + def __init__(self, signature=None) -> None: + self.signature = None + + def transform(self, data: str) -> BookData: + def _get_line(source: str, search: str) -> str: + try: + data = ( + source.split(search)[1] + .split("\n")[0] + .strip() + .replace("=>", "") + .strip() + ) + return data + + except Exception: + # # log.debug(f"ARRAYData.transform failed, {source}, {search}") + log.exception(f"ARRAYData.transform failed, no string {search}") + return "" + + def _get_list_entry(source: str, search: str, entry: str) -> str: + try: + source = source.replace("\t", "").replace("\r", "") + source = source.split(search)[1].split(")")[0] + return _get_line(source, entry).replace("=>", "").strip() + except: + return "" + + def _get_isbn(source: str) -> list: + try: + isbn = source.split("[isbn]")[1].split(")")[0].strip() + isbn = isbn.split("(")[1] + isbns = isbn.split("=>") + ret = [] + for _ in isbns: + # remove _ from list + isb = _.split("\n")[0].strip() + if isb == "": + continue + ret.append(isb) if isb not in ret else None + return ret + except: + isbn = [] + return isbn + + def _get_signature(data): + try: + sig_data = ( + data.split("[loksatz]")[1] + .split("[0] => ")[1] + .split("\n")[0] + .strip() + ) + signature_data = eval(sig_data) + return signature_data["signatur"] + except Exception: + return None + + def _get_author(data): + try: + array = data.split("[au_display_short]")[1].split(")\n")[0].strip() + except Exception: + return "" + entries = array.split("\n") + authors = [] + hg_present = False + verf_present = False + lines = [] + for entry in entries: + if "=>" in entry: + line = entry.split("=>")[1].strip() + if "[HerausgeberIn]" in line: + hg_present = True + if "[VerfasserIn]" in line: + verf_present = True + lines.append(line) + for line in lines: + if hg_present and verf_present: + if "[HerausgeberIn]" in line: + authors.append(line.split("[")[0].strip()) + elif verf_present: + if "[VerfasserIn]" in line: + authors.append(line.split("[")[0].strip()) + else: + pass + return ";".join(authors) + + def _get_title(data): + titledata = None + title = "" + if "[ti_long]" in data: + titledata = data.split("[ti_long]")[1].split(")\n")[0].strip() + title = titledata.split("=>")[1].strip().split("/")[0].strip() + if "[ti_long_f]" in data: + titledata = data.split("[ti_long_f]")[1].split(")\n")[0].strip() + title = titledata.split("=>")[1].strip().split("/")[0].strip() + return title + + def _get_adis_idn(data, signature): + loksatz_match = re.search( + r"\[loksatz\] => Array\s*\((.*?)\)", data, re.DOTALL, + ) + if loksatz_match: + loksatz_content = loksatz_match.group(1) + + # Step 2: Extract JSON objects within the loksatz section + json_objects = re.findall(r"{.*?}", loksatz_content, re.DOTALL) + # Print each JSON object + for obj in json_objects: + data = eval(obj) + if data["signatur"] == signature: + return data["adis_idn"] + + def _get_in_apparat(data): + loksatz_match = re.search( + r"\[loksatz\] => Array\s*\((.*?)\)", data, re.DOTALL, + ) + if loksatz_match: + loksatz_content = loksatz_match.group(1) + + # Step 2: Extract JSON objects within the loksatz section + json_objects = re.findall(r"{.*?}", loksatz_content, re.DOTALL) + # Print each JSON object + for obj in json_objects: + data = eval(obj) + if data["ausleihcode"] == "R" and data["standort"] == "40": + return True + return False + + ppn = _get_line(data, "[kid]") + title = _get_title(data).strip() + author = _get_author(data) + edition = _get_list_entry(data, "[ausgabe]", "[0]").replace(",", "") + link = f"https://rds.ibs-bw.de/phfreiburg/link?kid={_get_line(data, '[kid]')}" + isbn = _get_isbn(data) + # [self._get_list_entry(data,"[isbn]","[0]"),self._get_list_entry(data,"[is]","[1]")], + language = _get_list_entry(data, "[la_facet]", "[0]") + publisher = _get_list_entry(data, "[pu]", "[0]") + year = _get_list_entry(data, "[py_display]", "[0]") + pages = _get_list_entry(data, "[umfang]", "[0]").split(":")[0].strip() + signature = ( + self.signature if self.signature is not None else _get_signature(data) + ) + + place = _get_list_entry(data, "[pp]", "[0]") + adis_idn = _get_adis_idn(data, signature=signature) + in_apparat = _get_in_apparat(data) + return BookData( + ppn=ppn, + title=title, + author=author, + edition=edition, + link=link, + isbn=isbn, + language=language, + publisher=publisher, + year=year, + pages=pages, + signature=signature, + place=place, + adis_idn=adis_idn, + in_apparat=in_apparat, + ) + + +class COinSData: + def __init__(self) -> None: + pass + + def transform(self, data: str) -> BookData: + def _get_line(source: str, search: str) -> str: + try: + data = source.split(f"{search}=")[1] # .split("")[0].strip() + return data.split("rft")[0].strip() if "rft" in data else data + except: + return "" + + return BookData( + ppn=_get_line(data, "rft_id").split("=")[1], + title=_get_line(data, "rft.btitle"), + author=f"{_get_line(data, 'rft.aulast')}, {_get_line(data, 'rft.aufirst')}", + edition=_get_line(data, "rft.edition"), + link=_get_line(data, "rft_id"), + isbn=_get_line(data, "rft.isbn"), + publisher=_get_line(data, "rft.pub"), + year=_get_line(data, "rft.date"), + pages=_get_line(data, "rft.tpages").split(":")[0].strip(), + ) + + +class RISData: + def __init__(self) -> None: + pass + + def transform(self, data: str) -> BookData: + def _get_line(source: str, search: str) -> str: + try: + data = source.split(f"{search} - ")[1] # .split("")[0].strip() + return data.split("\n")[0].strip() if "\n" in data else data + except: + return "" + + return BookData( + ppn=_get_line(data, "DP").split("=")[1], + title=_get_line(data, "TI"), + signature=_get_line(data, "CN"), + edition=_get_line(data, "ET").replace(",", ""), + link=_get_line(data, "DP"), + isbn=_get_line(data, "SN").split(","), + author=_get_line(data, "AU").split("[")[0].strip(), + language=_get_line(data, "LA"), + publisher=_get_line(data, "PB"), + year=_get_line(data, "PY"), + pages=_get_line(data, "SP"), + ) + + +class BibTeXData: + def __init__(self): + pass + + def transform(self, data: str) -> BookData: + def _get_line(source: str, search: str) -> str: + try: + return ( + data.split(search)[1] + .split("\n")[0] + .strip() + .split("=")[1] + .strip() + .replace("{", "") + .replace("}", "") + .replace(",", "") + .replace("[", "") + .replace("];", "") + ) + except: + return "" + + return BookData( + ppn=None, + title=_get_line(data, "title"), + signature=_get_line(data, "bestand"), + edition=_get_line(data, "edition"), + isbn=_get_line(data, "isbn"), + author=";".join(_get_line(data, "author").split(" and ")), + language=_get_line(data, "language"), + publisher=_get_line(data, "publisher"), + year=_get_line(data, "year"), + pages=_get_line(data, "pages"), + ) + + +class RDSData: + def __init__(self): + self.retlist = [] + + def transform(self, data: str): + # rds_availability = RDS_AVAIL_DATA() + # rds_data = RDS_GENERIC_DATA() + print(data) + + def __get_raw_data(data: str) -> list: + # create base data to be turned into pydantic classes + data = data.split("RDS ----------------------------------")[1] + edata = data.strip() + edata = edata.split("\n", 9)[9] + edata = edata.split("\n")[1:] + entry_1 = edata[0] + edata = edata[1:] + entry_2 = "".join(edata) + edata = [] + edata.append(entry_1) + edata.append(entry_2) + return edata + + ret_data = __get_raw_data(data) + # assign data[1] to RDS_AVAIL_DATA + # assign data[0] to RDS_DATA + self.rds_data = RDS_GENERIC_DATA().import_from_dict(ret_data[1]) + self.rds_availability = RDS_AVAIL_DATA().import_from_dict(ret_data[0]) + self.retlist.append(self.rds_availability) + self.retlist.append(self.rds_data) + return self + + def return_data(self, option=None): + if option == "rds_availability": + return self.retlist[0] + if option == "rds_data": + return self.retlist[1] + return {"rds_availability": self.retlist[0], "rds_data": self.retlist[1]} + + +class DictToTable: + def __init__(self): + self.work_author = None + self.section_author = None + self.year = None + self.edition = None + self.work_title = None + self.chapter_title = None + self.location = None + self.publisher = None + self.signature = None + self.type = None + self.pages = None + self.issue = None + self.isbn = None + + def makeResult(self): + data = { + "work_author": self.work_author, + "section_author": self.section_author, + "year": self.year, + "edition": self.edition, + "work_title": self.work_title, + "chapter_title": self.chapter_title, + "location": self.location, + "publisher": self.publisher, + "signature": self.signature, + "issue": self.issue, + "pages": self.pages, + "isbn": self.isbn, + "type": self.type, + } + data = {k: v for k, v in data.items() if v is not None} + return data + + def reset(self): + for key in self.__dict__: + setattr(self, key, None) + + def transform(self, data: dict): + mode = data["mode"] + self.reset() + if mode == "book": + return self.book_assign(data) + if mode == "hg": + return self.hg_assign(data) + if mode == "zs": + return self.zs_assign(data) + return None + + def book_assign(self, data): + self.type = "book" + self.work_author = data["book_author"] + self.signature = data["book_signature"] + self.location = data["book_place"] + self.year = data["book_year"] + self.work_title = data["book_title"] + self.edition = data["book_edition"] + self.pages = data["book_pages"] + self.publisher = data["book_publisher"] + self.isbn = data["book_isbn"] + return self.makeResult() + + def hg_assign(self, data): + self.type = "hg" + self.section_author = data["hg_author"] + self.work_author = data["hg_editor"] + self.year = data["hg_year"] + self.work_title = data["hg_title"] + self.publisher = data["hg_publisher"] + self.location = data["hg_place"] + self.edition = data["hg_edition"] + self.chapter_title = data["hg_chaptertitle"] + self.pages = data["hg_pages"] + self.signature = data["hg_signature"] + self.isbn = data["hg_isbn"] + return self.makeResult() + + def zs_assign(self, data): + self.type = "zs" + self.section_author = data["zs_author"] + self.chapter_title = data["zs_chapter_title"] + self.location = data["zs_place"] + self.issue = data["zs_issue"] + self.pages = data["zs_pages"] + self.publisher = data["zs_publisher"] + self.isbn = data["zs_isbn"] + + self.year = data["zs_year"] + self.signature = data["zs_signature"] + self.work_title = data["zs_title"] + return self.makeResult() + + + diff --git a/src/bibapi/catalogue.py b/src/bibapi/catalogue.py index c9babe6..b5759d9 100644 --- a/src/bibapi/catalogue.py +++ b/src/bibapi/catalogue.py @@ -1,5 +1,3 @@ -from typing import List - import regex import requests from bs4 import BeautifulSoup @@ -33,11 +31,11 @@ class Catalogue: response = requests.get(link, timeout=self.timeout) return response.text - def get_book_links(self, searchterm: str) -> List[str]: + def get_book_links(self, searchterm: str) -> list[str]: response = self.search_book(searchterm) soup = BeautifulSoup(response, "html.parser") links = soup.find_all("a", class_="title getFull") - res: List[str] = [] + res: list[str] = [] for link in links: res.append(BASE + link["href"]) # type: ignore return res @@ -186,7 +184,8 @@ class Catalogue: class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel", ).get_text(strip=True) book.isbn = isbn - # from div col-xs-12 col-md-5 col-lg-4 rds-dl-head RDS_SCOPE get pages (second div in this div) + # from div col-xs-12 col-md-5 col-lg-4 rds-dl-head + # RDS_SCOPE get pages (second div in this div) pages = None pages_el = soup.find("div", class_="RDS_SCOPE") if pages_el: @@ -206,14 +205,14 @@ class Catalogue: # based on PPN, get title, people, edition, year, language, pages, isbn, link = f"https://rds.ibs-bw.de/phfreiburg/opac/RDSIndexrecord/{ppn}" result = self.search(link) - soup = BeautifulSoup(result, "html.parser") + BeautifulSoup(result, "html.parser") def get_ppn(self, searchterm: str) -> str | None: links = self.get_book_links(searchterm) ppn = None for link in links: result = self.search(link) - soup = BeautifulSoup(result, "html.parser") + BeautifulSoup(result, "html.parser") ppn = link.split("/")[-1] if ppn and regex.match(r"^\d{8,10}[X\d]?$", ppn): return ppn diff --git a/src/bibapi/schemas/bookdata.py b/src/bibapi/schemas/bookdata.py index f2d4ba9..ad1df96 100644 --- a/src/bibapi/schemas/bookdata.py +++ b/src/bibapi/schemas/bookdata.py @@ -1,6 +1,6 @@ import json from dataclasses import dataclass, field -from typing import Any, Optional, Union +from typing import Any import regex @@ -12,9 +12,9 @@ class BookData: signature: str | None = None edition: str | None = None link: str | None = None - isbn: Union[str, list[str], None] = field(default_factory=list[str]) + isbn: str | list[str] | None = field(default_factory=list[str]) author: str | None = None - language: Union[str, list[str], None] = field(default_factory=list) + language: str | list[str] | None = field(default_factory=list) publisher: str | None = None place: str | None = None year: int | None = None @@ -23,9 +23,10 @@ class BookData: in_apparat: bool | None = False adis_idn: str | None = None old_book: Any | None = None - media_type: str | None = None # + media_type: str | None = None in_library: bool | None = None # whether the book is in the library or not libraries: list[str] | None = field(default_factory=list) + medianr: int | None = None # media number def __post_init__(self): self.library_location = ( @@ -72,11 +73,10 @@ class BookData: key: value for key, value in self.__dict__.items() if value is not None } # remove old_book from data_dict - if "old_book" in data_dict: - del data_dict["old_book"] + data_dict.pop("old_book", None) return json.dumps(data_dict, ensure_ascii=False) - def from_dataclass(self, dataclass: Optional[Any]) -> None: + def from_dataclass(self, dataclass: Any | None) -> None: if dataclass is None: return for key, value in dataclass.__dict__.items(): @@ -86,8 +86,7 @@ class BookData: if isinstance(self.media_type, str): if "Online" in self.pages: return "eBook" - else: - return "Druckausgabe" + return "Druckausgabe" return None def from_string(self, data: str) -> "BookData": @@ -114,7 +113,7 @@ class BookData: return self @property - def edition_number(self) -> Optional[int]: + def edition_number(self) -> int | None: if self.edition is None: return 0 match = regex.search(r"(\d+)", self.edition) diff --git a/src/bibapi/schemas/marcxml.py b/src/bibapi/schemas/marcxml.py index e966aa2..89cbf15 100644 --- a/src/bibapi/schemas/marcxml.py +++ b/src/bibapi/schemas/marcxml.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from typing import List, Optional +from typing import list # --- MARC XML structures --- @@ -20,14 +20,14 @@ class DataField: tag: str ind1: str = " " ind2: str = " " - subfields: List[SubField] = field(default_factory=list) + subfields: list[SubField] = field(default_factory=list) @dataclass class MarcRecord: leader: str - controlfields: List[ControlField] = field(default_factory=list) - datafields: List[DataField] = field(default_factory=list) + controlfields: list[ControlField] = field(default_factory=list) + datafields: list[DataField] = field(default_factory=list) # --- SRU record wrapper --- @@ -52,17 +52,17 @@ class EchoedSearchRequest: class SearchRetrieveResponse: version: str numberOfRecords: int - records: List[Record] = field(default_factory=list) - echoedSearchRetrieveRequest: Optional[EchoedSearchRequest] = None + records: list[Record] = field(default_factory=list) + echoedSearchRetrieveRequest: EchoedSearchRequest | None = None @dataclass class FormattedResponse: title: str - edition: Optional[str] = None - publisher: Optional[str] = None - year: Optional[str] = None - authors: List[str] = field(default_factory=list) - isbn: List[str] = field(default_factory=list) - ppn: Optional[str] = None - libraries: List[str] = field(default_factory=list) + edition: str | None = None + publisher: str | None = None + year: str | None = None + authors: list[str] = field(default_factory=list) + isbn: list[str] = field(default_factory=list) + ppn: str | None = None + libraries: list[str] = field(default_factory=list) diff --git a/src/bibapi/sru.py b/src/bibapi/sru.py index d84fffd..9a2d29d 100644 --- a/src/bibapi/sru.py +++ b/src/bibapi/sru.py @@ -1,8 +1,9 @@ import re import time import xml.etree.ElementTree as ET +from collections.abc import Iterable from enum import Enum -from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union +from typing import Any import requests from requests.adapters import HTTPAdapter @@ -24,7 +25,7 @@ MARC = "http://www.loc.gov/MARC21/slim" NS = {"zs": ZS, "marc": MARC} -def _text(elem: Optional[ET.Element]) -> str: +def _text(elem: ET.Element | None) -> str: return (elem.text or "") if elem is not None else "" @@ -36,32 +37,31 @@ def _req_text(parent: ET.Element, path: str) -> str: def parse_marc_record(record_el: ET.Element) -> MarcRecord: - """ - record_el is the element (default ns MARC in your sample) + """record_el is the element (default ns MARC in your sample) """ # leader leader_text = _req_text(record_el, "marc:leader") # controlfields - controlfields: List[ControlField] = [] + controlfields: list[ControlField] = [] for cf in record_el.findall("marc:controlfield", NS): tag = cf.get("tag", "").strip() controlfields.append(ControlField(tag=tag, value=_text(cf))) # datafields - datafields: List[DataField] = [] + datafields: list[DataField] = [] for df in record_el.findall("marc:datafield", NS): tag = df.get("tag", "").strip() ind1 = df.get("ind1") or " " ind2 = df.get("ind2") or " " - subfields: List[SubField] = [] + subfields: list[SubField] = [] for sf in df.findall("marc:subfield", NS): code = sf.get("code", "") subfields.append(SubField(code=code, value=_text(sf))) datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields)) return MarcRecord( - leader=leader_text, controlfields=controlfields, datafields=datafields + leader=leader_text, controlfields=controlfields, datafields=datafields, ) @@ -92,7 +92,7 @@ def parse_record(zs_record_el: ET.Element) -> Record: ) -def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]: +def parse_echoed_request(root: ET.Element) -> EchoedSearchRequest | None: el = root.find("zs:echoedSearchRetrieveRequest", NS) if el is None: return None @@ -119,7 +119,7 @@ def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]: def parse_search_retrieve_response( - xml_str: Union[str, bytes], + xml_str: str | bytes, ) -> SearchRetrieveResponse: root = ET.fromstring(xml_str) @@ -128,7 +128,7 @@ def parse_search_retrieve_response( numberOfRecords = int(_req_text(root, "zs:numberOfRecords") or "0") records_parent = root.find("zs:records", NS) - records: List[Record] = [] + records: list[Record] = [] if records_parent is not None: for r in records_parent.findall("zs:record", NS): record = parse_record(r) @@ -150,9 +150,9 @@ def parse_search_retrieve_response( def iter_datafields( rec: MarcRecord, - tag: Optional[str] = None, - ind1: Optional[str] = None, - ind2: Optional[str] = None, + tag: str | None = None, + ind1: str | None = None, + ind2: str | None = None, ) -> Iterable[DataField]: """Yield datafields, optionally filtered by tag/indicators.""" for df in rec.datafields: @@ -170,11 +170,11 @@ def subfield_values( tag: str, code: str, *, - ind1: Optional[str] = None, - ind2: Optional[str] = None, -) -> List[str]: + ind1: str | None = None, + ind2: str | None = None, +) -> list[str]: """All values for subfield `code` in every `tag` field (respecting indicators).""" - out: List[str] = [] + out: list[str] = [] for df in iter_datafields(rec, tag, ind1, ind2): out.extend(sf.value for sf in df.subfields if sf.code == code) return out @@ -185,10 +185,10 @@ def first_subfield_value( tag: str, code: str, *, - ind1: Optional[str] = None, - ind2: Optional[str] = None, - default: Optional[str] = None, -) -> Optional[str]: + ind1: str | None = None, + ind2: str | None = None, + default: str | None = None, +) -> str | None: """First value for subfield `code` in `tag` (respecting indicators).""" for df in iter_datafields(rec, tag, ind1, ind2): for sf in df.subfields: @@ -201,25 +201,24 @@ def find_datafields_with_subfields( rec: MarcRecord, tag: str, *, - where_all: Optional[Dict[str, str]] = None, - where_any: Optional[Dict[str, str]] = None, + where_all: dict[str, str] | None = None, + where_any: dict[str, str] | None = None, casefold: bool = False, - ind1: Optional[str] = None, - ind2: Optional[str] = None, -) -> List[DataField]: - """ - Return datafields of `tag` whose subfields match constraints: + ind1: str | None = None, + ind2: str | None = None, +) -> list[DataField]: + """Return datafields of `tag` whose subfields match constraints: - where_all: every (code -> exact value) must be present - where_any: at least one (code -> exact value) present Set `casefold=True` for case-insensitive comparison. """ where_all = where_all or {} where_any = where_any or {} - matched: List[DataField] = [] + matched: list[DataField] = [] for df in iter_datafields(rec, tag, ind1, ind2): # Map code -> list of values (with optional casefold applied) - vals: Dict[str, List[str]] = {} + vals: dict[str, list[str]] = {} for sf in df.subfields: v = sf.value.casefold() if casefold else sf.value vals.setdefault(sf.code, []).append(v) @@ -246,8 +245,8 @@ def find_datafields_with_subfields( def controlfield_value( - rec: MarcRecord, tag: str, default: Optional[str] = None -) -> Optional[str]: + rec: MarcRecord, tag: str, default: str | None = None, +) -> str | None: """Get the first controlfield value by tag (e.g., '001', '005').""" for cf in rec.controlfields: if cf.tag == tag: @@ -256,8 +255,8 @@ def controlfield_value( def datafields_value( - data: List[DataField], code: str, default: Optional[str] = None -) -> Optional[str]: + data: list[DataField], code: str, default: str | None = None, +) -> str | None: """Get the first value for a specific subfield code in a list of datafields.""" for df in data: for sf in df.subfields: @@ -267,8 +266,8 @@ def datafields_value( def datafield_value( - df: DataField, code: str, default: Optional[str] = None -) -> Optional[str]: + df: DataField, code: str, default: str | None = None, +) -> str | None: """Get the first value for a specific subfield code in a datafield.""" for sf in df.subfields: if sf.code == code: @@ -276,9 +275,8 @@ def datafield_value( return default -def _smart_join_title(a: str, b: Optional[str]) -> str: - """ - Join 245 $a and $b with MARC-style punctuation. +def _smart_join_title(a: str, b: str | None) -> str: + """Join 245 $a and $b with MARC-style punctuation. If $b is present, join with ' : ' unless either side already supplies punctuation. """ a = a.strip() @@ -293,7 +291,7 @@ def _smart_join_title(a: str, b: Optional[str]) -> str: def subfield_values_from_fields( fields: Iterable[DataField], code: str, -) -> List[str]: +) -> list[str]: """All subfield values with given `code` across a list of DataField.""" return [sf.value for df in fields for sf in df.subfields if sf.code == code] @@ -301,8 +299,8 @@ def subfield_values_from_fields( def first_subfield_value_from_fields( fields: Iterable[DataField], code: str, - default: Optional[str] = None, -) -> Optional[str]: + default: str | None = None, +) -> str | None: """First subfield value with given `code` across a list of DataField.""" for df in fields: for sf in df.subfields: @@ -314,12 +312,11 @@ def first_subfield_value_from_fields( def subfield_value_pairs_from_fields( fields: Iterable[DataField], code: str, -) -> List[Tuple[DataField, str]]: - """ - Return (DataField, value) pairs for all subfields with `code`. +) -> list[tuple[DataField, str]]: + """Return (DataField, value) pairs for all subfields with `code`. Useful if you need to know which field a value came from. """ - out: List[Tuple[DataField, str]] = [] + out: list[tuple[DataField, str]] = [] for df in fields: for sf in df.subfields: if sf.code == code: @@ -340,13 +337,13 @@ def book_from_marc(rec: MarcRecord, library_identifier: str) -> BookData: # Signature = 924 where $9 == "Frei 129" → take that field's $g frei_fields = find_datafields_with_subfields( - rec, "924", where_all={"9": "Frei 129"} + rec, "924", where_all={"9": "Frei 129"}, ) signature = first_subfield_value_from_fields(frei_fields, "g") # Year = 264 $c (prefer ind2="1" publication; fallback to any 264) year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value( - rec, "264", "c" + rec, "264", "c", ) isbn = subfield_values(rec, "020", "a") mediatype = first_subfield_value(rec, "338", "a") @@ -378,10 +375,9 @@ RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK no def find_newer_edition( - swb_result: BookData, dnb_result: List[BookData] -) -> Optional[List[BookData]]: - """ - New edition if: + swb_result: BookData, dnb_result: list[BookData], +) -> list[BookData] | None: + """New edition if: - year > swb.year OR - edition_number > swb.edition_number @@ -393,7 +389,7 @@ def find_newer_edition( edition_number desc, best-signature-match desc, has-signature desc). """ - def norm_sig(s: Optional[str]) -> str: + def norm_sig(s: str | None) -> str: if not s: return "" # normalize: lowercase, collapse whitespace, keep alnum + a few separators @@ -427,7 +423,7 @@ def find_newer_edition( swb_sig_norm = norm_sig(getattr(swb_result, "signature", None)) # 1) Filter to same-work AND newer - candidates: List[BookData] = [] + candidates: list[BookData] = [] for b in dnb_result: # Skip if both signatures exist and don't match (different work) b_sig = getattr(b, "signature", None) @@ -443,7 +439,7 @@ def find_newer_edition( return None # 2) Dedupe by PPN, preferring signature (and matching signature if possible) - by_ppn: dict[Optional[str], BookData] = {} + by_ppn: dict[str | None, BookData] = {} for b in candidates: key = getattr(b, "ppn", None) prev = by_ppn.get(key) @@ -477,7 +473,7 @@ def find_newer_edition( class QueryTransformer: - def __init__(self, api_schema: Type[Enum], arguments: Union[Iterable[str], str]): + def __init__(self, api_schema: type[Enum], arguments: Iterable[str] | str): self.api_schema = api_schema if isinstance(arguments, str): self.arguments = [arguments] @@ -485,8 +481,8 @@ class QueryTransformer: self.arguments = arguments self.drop_empty = True - def transform(self) -> Dict[str, Any]: - arguments: List[str] = [] + def transform(self) -> dict[str, Any]: + arguments: list[str] = [] schema = self.api_schema for arg in self.arguments: if "=" not in arg: @@ -497,16 +493,16 @@ class QueryTransformer: if hasattr(schema, key.upper()): api_key = getattr(schema, key.upper()).value if key.upper() == "AUTHOR" and hasattr(schema, "AUTHOR_SCHEMA"): - author_schema = getattr(schema, "AUTHOR_SCHEMA").value + author_schema = schema.AUTHOR_SCHEMA.value if author_schema == "SpaceAfterComma": value = value.replace(",", ", ") elif author_schema == "NoSpaceAfterComma": value = value.replace(", ", ",") value = value.replace(" ", " ") if key.upper() == "TITLE" and hasattr( - schema, "ENCLOSE_TITLE_IN_QUOTES" + schema, "ENCLOSE_TITLE_IN_QUOTES", ): - if getattr(schema, "ENCLOSE_TITLE_IN_QUOTES"): + if schema.ENCLOSE_TITLE_IN_QUOTES: value = f'"{value}"' arguments.append(f"{api_key}={value}") @@ -519,10 +515,10 @@ class Api: self, site: str, url: str, - prefix: Type[Enum], + prefix: type[Enum], library_identifier: str, - notsupported_args: Optional[List[str]] = None, - replace: Optional[Dict[str, str]] = None, + notsupported_args: list[str] | None = None, + replace: dict[str, str] | None = None, ): self.site = site self.url = url @@ -554,7 +550,7 @@ class Api: # Best-effort cleanup self.close() - def get(self, query_args: Union[Iterable[str], str]) -> List[Record]: + def get(self, query_args: Iterable[str] | str) -> list[Record]: start_time = time.monotonic() # if any query_arg ends with =, remove it if isinstance(query_args, str): @@ -566,7 +562,7 @@ class Api: if not any(qa.startswith(na + "=") for na in self.notsupported_args) ] query_args = QueryTransformer( - api_schema=self.prefix, arguments=query_args + api_schema=self.prefix, arguments=query_args, ).transform() query = "+and+".join(query_args) for old, new in self.replace.items(): @@ -579,12 +575,12 @@ class Api: "Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3", } # Use persistent session, enforce 1 req/sec, and retry up to 5 times - last_error: Optional[Exception] = None + last_error: Exception | None = None for attempt in range(1, self._max_retries + 1): # Abort if overall timeout exceeded before starting attempt if time.monotonic() - start_time > self._overall_timeout_seconds: last_error = requests.exceptions.Timeout( - f"Overall timeout {self._overall_timeout_seconds}s exceeded before attempt {attempt}" + f"Overall timeout {self._overall_timeout_seconds}s exceeded before attempt {attempt}", ) break # Enforce rate limit relative to last request end @@ -596,21 +592,20 @@ class Api: try: # Per-attempt read timeout capped at remaining overall budget (but at most 30s) remaining = max( - 0.0, self._overall_timeout_seconds - (time.monotonic() - start_time) + 0.0, self._overall_timeout_seconds - (time.monotonic() - start_time), ) read_timeout = min(30.0, remaining if remaining > 0 else 0.001) resp = self._session.get( - url, headers=headers, timeout=(3.05, read_timeout) + url, headers=headers, timeout=(3.05, read_timeout), ) self._last_request_time = time.monotonic() if resp.status_code == 200: # Parse using raw bytes (original behavior) to preserve encoding edge cases sr = parse_search_retrieve_response(resp.content) return sr.records - else: - last_error = Exception( - f"Error fetching data from {self.site}: HTTP {resp.status_code} (attempt {attempt}/{self._max_retries})" - ) + last_error = Exception( + f"Error fetching data from {self.site}: HTTP {resp.status_code} (attempt {attempt}/{self._max_retries})", + ) except requests.exceptions.ReadTimeout as e: last_error = e except requests.exceptions.Timeout as e: @@ -625,9 +620,9 @@ class Api: # If we exit the loop, all attempts failed raise last_error if last_error else Exception("Unknown request failure") - def getBooks(self, query_args: Union[Iterable[str], str]) -> List[BookData]: + def getBooks(self, query_args: Iterable[str] | str) -> list[BookData]: try: - records: List[Record] = self.get(query_args) + records: list[Record] = self.get(query_args) except requests.exceptions.ReadTimeout: # Return a list with a single empty BookData object on read timeout return [BookData()] @@ -638,7 +633,7 @@ class Api: # Propagate other errors (could also choose to return empty list) raise # Avoid printing on hot paths; rely on logger if needed - books: List[BookData] = [] + books: list[BookData] = [] # extract title from query_args if present title = None for arg in query_args: diff --git a/src/bibapi/webrequest.py b/src/bibapi/webrequest.py new file mode 100644 index 0000000..867cc6e --- /dev/null +++ b/src/bibapi/webrequest.py @@ -0,0 +1,296 @@ +from enum import Enum +from typing import Any + +import requests +from bs4 import BeautifulSoup + +# import sleep_and_retry decorator to retry requests +from ratelimit import limits, sleep_and_retry + +from src.bibapi._transformers import ( + RDS_AVAIL_DATA, + RDS_GENERIC_DATA, + ARRAYData, + BibTeXData, + COinSData, + RDSData, + RISData, +) +from src.bibapi.schemas.bookdata import BookData + +API_URL = "https://rds.ibs-bw.de/phfreiburg/opac/RDSIndexrecord/{}/" +PPN_URL = "https://rds.ibs-bw.de/phfreiburg/opac/RDSIndex/Search?type0%5B%5D=allfields&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=au&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ti&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ct&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=isn&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ta&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=co&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=py&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pp&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pu&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=si&lookfor0%5B%5D={}&join=AND&bool0%5B%5D=AND&type0%5B%5D=zr&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=cc&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND" +BASE = "https://rds.ibs-bw.de" +TITLE = "RDS_TITLE" +SIGNATURE = "RDS_SIGNATURE" +EDITION = "RDS_EDITION" +ISBN = "RDS_ISBN" +AUTHOR = "RDS_PERSON" + +HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 \ + (HTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36", + "Accept-Language": "en-US, en;q=0.5", +} +RATE_LIMIT = 20 +RATE_PERIOD = 30 + + +class TransformerType(Enum): + """Enum for possible Transformer types.""" + + ARRAY = "ARRAY" + COinS = "COinS" + BibTeX = "BibTeX" + RIS = "RIS" + RDS = "RDS" + + +class WebRequest: + def __init__(self) -> None: + """Request data from the web, and format it depending on the mode.""" + self.apparat = None + self.use_any = False # use any book that matches the search term + self.signature = None + self.ppn = None + self.data = None + self.timeout = 5 + self.public_ip = None + + self.canrun() + + def canrun(self) -> None: + """Check if requests can be made.""" + try: + #check public IP to see if the requested data can be accessed + ip_response = requests.get("https://api.ipify.org", timeout=self.timeout) + ip_response.raise_for_status() + self.public_ip = ip_response.text + except requests.exceptions.RequestException as e: + raise ConnectionError("No internet connection") from e + + if self.public_ip is None: + raise ConnectionError("No internet connection") + + @property + def use_any_book(self): + """Use any book that matches the search term""" + self.use_any = True + return self + + def set_apparat(self, apparat: int) -> "WebRequest": + self.apparat = apparat + if int(self.apparat) < 10: + self.apparat = f"0{self.apparat}" + return self + + def get_ppn(self, signature: str) -> "WebRequest": + self.signature = signature + if "+" in signature: + signature = signature.replace("+", "%2B") + if "doi.org" in signature: + signature = signature.split("/")[-1] + self.ppn = signature + return self + + @sleep_and_retry + @limits(calls=RATE_LIMIT, period=RATE_PERIOD) + def search_book(self, searchterm: str) -> str: + response = requests.get(PPN_URL.format(searchterm), timeout=self.timeout) + return response.text + + @sleep_and_retry + @limits(calls=RATE_LIMIT, period=RATE_PERIOD) + def search_ppn(self, ppn: str) -> str: + response = requests.get(API_URL.format(ppn), timeout=self.timeout) + return response.text + + def get_book_links(self, searchterm: str) -> list[str]: + response: str = self.search_book(searchterm) # type:ignore + soup = BeautifulSoup(response, "html.parser") + links = soup.find_all("a", class_="title getFull") + res: list[str] = [] + for link in links: + res.append(BASE + link["href"]) + return res + + @sleep_and_retry + @limits(calls=RATE_LIMIT, period=RATE_PERIOD) + def search(self, link: str) -> str | None: + try: + response = requests.get(link, timeout=self.timeout) + return response.text + except requests.exceptions.RequestException: + return None + + def get_data(self) -> list[str] | None: + links = self.get_book_links(self.ppn) + log.debug(f"Links: {links}") + return_data: list[str] = [] + for link in links: + result: str = self.search(link) # type:ignore + # in result search for class col-xs-12 rds-dl RDS_LOCATION + # if found, return text of href + soup = BeautifulSoup(result, "html.parser") + locations = soup.find_all("div", class_="col-xs-12 rds-dl RDS_LOCATION") + if locations: + for location in locations: + if "1. OG Semesterapparat" in location.text: + pre_tag = soup.find_all("pre") + return_data = [] + if pre_tag: + for tag in pre_tag: + data = tag.text.strip() + return_data.append(data) + return return_data + return return_data + item_location = location.find( + "div", class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel", + ).text.strip() + if self.use_any: + pre_tag = soup.find_all("pre") + if pre_tag: + for tag in pre_tag: + data = tag.text.strip() + return_data.append(data) + return return_data + raise ValueError("No
 tag found")
+                    if f"Semesterapparat-{self.apparat}" in item_location:
+                        pre_tag = soup.find_all("pre")
+                        return_data = []
+                        if pre_tag:
+                            for tag in pre_tag:
+                                data = tag.text.strip()
+                                return_data.append(data)
+                            return return_data
+                        return return_data
+
+        return return_data
+
+    def get_data_elsa(self) -> list[str] | None:
+        links = self.get_book_links(self.ppn)
+        for link in links:
+            result = self.search(link)
+            # in result search for class col-xs-12 rds-dl RDS_LOCATION
+            # if found, return text of href
+            soup = BeautifulSoup(result, "html.parser")
+            locations = soup.find_all("div", class_="col-xs-12 rds-dl RDS_LOCATION")
+            if locations:
+                for _ in locations:
+                    pre_tag = soup.find_all("pre")
+                    return_data = []
+                    if pre_tag:
+                        for tag in pre_tag:
+                            data = tag.text.strip()
+                            return_data.append(data)
+                        return return_data
+        return None
+
+
+class BibTextTransformer:
+    """Transforms data from the web into a BibText format.
+
+        Valid Modes are ARRAY, COinS, BibTeX, RIS, RDS
+    Raises:
+        ValueError: Raised if mode is not in valid_modes
+    """
+
+    valid_modes = [
+        TransformerType.ARRAY,
+        TransformerType.COinS,
+        TransformerType.BibTeX,
+        TransformerType.RIS,
+        TransformerType.RDS,
+    ]
+
+    def __init__(self, mode: TransformerType = TransformerType.ARRAY) -> None:
+        self.mode = mode.value
+        self.field = None
+        self.signature = None
+        if mode not in self.valid_modes:
+            raise ValueError(f"Mode {mode} not valid")
+        self.data = None
+        # self.bookdata = BookData(**self.data)
+
+    def use_signature(self, signature: str) -> "BibTextTransformer":
+        """Use the exact signature to search for the book"""
+        self.signature = signature
+        return self
+
+    def get_data(self, data: list[str] | None = None) -> "BibTextTransformer":
+        RIS_IDENT = "TY  -"
+        ARRAY_IDENT = "[kid]"
+        COinS_IDENT = "ctx_ver"
+        BIBTEX_IDENT = "@book"
+        RDS_IDENT = "RDS ---------------------------------- "
+
+        if data is None:
+            self.data = None
+            return self
+
+        if self.mode == "RIS":
+            for line in data:
+                if RIS_IDENT in line:
+                    self.data = line
+        elif self.mode == "ARRAY":
+            for line in data:
+                if ARRAY_IDENT in line:
+                    self.data = line
+        elif self.mode == "COinS":
+            for line in data:
+                if COinS_IDENT in line:
+                    self.data = line
+        elif self.mode == "BibTeX":
+            for line in data:
+                if BIBTEX_IDENT in line:
+                    self.data = line
+        elif self.mode == "RDS":
+            for line in data:
+                if RDS_IDENT in line:
+                    self.data = line
+        return self
+
+    def return_data(
+        self, option: Any = None,
+    ) -> BookData | None | RDS_GENERIC_DATA | RDS_AVAIL_DATA | dict[str, RDS_AVAIL_DATA | RDS_GENERIC_DATA]:
+        """Return Data to caller.
+
+        Args:
+            option (string, optional): Option for RDS as there are two filetypes. Use rds_availability or rds_data. Anything else gives a dict of both responses. Defaults to None.
+
+        Returns:
+            BookData: a dataclass containing data about the book
+
+        """
+        if self.data is None:
+            return None
+        match self.mode:
+            case "ARRAY":
+                return ARRAYData(self.signature).transform(self.data)
+            case "COinS":
+                return COinSData().transform(self.data)
+            case "BibTeX":
+                return BibTeXData().transform(self.data)
+            case "RIS":
+                return RISData().transform(self.data)
+            case "RDS":
+                return RDSData().transform(self.data).return_data(option)
+            case _:
+                return None
+
+
+
+def cover(isbn):
+    test_url = f"https://www.buchhandel.de/cover/{isbn}/{isbn}-cover-m.jpg"
+    data = requests.get(test_url, stream=True)
+    return data.content
+
+
+def get_content(soup, css_class):
+    return soup.find("div", class_=css_class).text.strip()
+
+
+if __name__ == "__main__":
+    link = "CU 8500 K64"
+    data = WebRequest(71).get_ppn(link).get_data()
+    bib = BibTextTransformer("ARRAY").get_data().return_data()
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..005c965
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1 @@
+"""Tests for the package."""
diff --git a/tests/conftest.py b/tests/conftest.py
deleted file mode 100644
index aed3b38..0000000
--- a/tests/conftest.py
+++ /dev/null
@@ -1,108 +0,0 @@
-from typing import Callable, Optional
-
-import pytest
-
-from bibapi import sru
-
-
-@pytest.fixture
-def sample_sru_xml() -> bytes:
-    """Return a small SRU searchRetrieveResponse (MARCXML) as bytes.
-
-    Tests can use this raw bytes payload to simulate SRU responses.
-    """
-    xml = b"""
-    
-      1.1
-      1
-      
-        
-          marcxml
-          xml
-          
-            
-              -----nam a22
-              PPN123
-              
-                Example Title
-                Subtitle
-              
-              
-                2001
-                Example Publisher
-              
-            
-          
-          1
-        
-      
-      
-        1.1
-        pica.tit=Example
-        10
-        xml
-        marcxml
-      
-    
-    """
-    return xml
-
-
-@pytest.fixture
-def sru_api_factory(monkeypatch) -> Callable[[str, Optional[bytes]], sru.Api]:
-    """Factory to create an `sru.Api` (or subclass) with network calls mocked.
-
-    Usage:
-      def test_x(sru_api_factory, sample_sru_xml):
-          api = sru_api_factory('SWB', sample_sru_xml)
-          books = api.getBooks(['pica.tit=Example'])
-
-    The fixture monkeypatches requests.Session.get on the Api instance to return
-    a fake Response with the provided bytes payload. If `response_bytes` is
-    None the real network call will be performed (not recommended in unit tests).
-    """
-
-    def _make(site: str, response_bytes: Optional[bytes] = None) -> sru.Api:
-        mapping = {"SWB": sru.SWB, "DNB": sru.Api}
-        if site == "SWB":
-            api = sru.SWB()
-        elif site == "DNB":
-            # DNB Api class is the base Api configured differently in sru module
-            api = sru.Api(
-                sru.DNBData.NAME.value,
-                sru.DNBData.URL.value,
-                sru.DNBData.ARGSCHEMA.value,
-            )
-        else:
-            # allow custom site/url/prefix via tuple passed as site: (site, url, prefix)
-            if isinstance(site, tuple) and len(site) == 3:
-                api = sru.Api(site[0], site[1], site[2])
-            else:
-                raise ValueError("Unknown site for factory: %r" % (site,))
-
-        if response_bytes is not None:
-
-            class FakeResp:
-                status_code = 200
-
-                def __init__(self, content: bytes):
-                    self.content = content
-
-            def fake_get(url, headers=None, timeout=None):
-                return FakeResp(response_bytes)
-
-            # Patch only this instance's session.get
-            monkeypatch.setattr(api._session, "get", fake_get)
-
-        return api
-
-    return _make
-
-
-import pytest
-
-
-@pytest.fixture
-def sru_data():
-    return {"bib_id": 20735, "sigil": "Frei129"}
diff --git a/tests/test_sru.py b/tests/test_sru.py
index 682441f..e966636 100644
--- a/tests/test_sru.py
+++ b/tests/test_sru.py
@@ -1,8 +1,7 @@
-from src.bibapi.sru import SWB
+from src.bibapi import SWB
 
 
-def test_swb_schema():
+def test_swb_schema() -> None:
     result = SWB().getBooks(["pica.tit=Java ist auch eine Insel", "pica.bib=20735"])
     assert len(result) == 1
     assert result[0].title == "Java ist auch eine Insel"
-    assert