Refactor code structure for improved readability and maintainability

This commit is contained in:
2025-11-07 11:28:13 +01:00
parent dab9d08297
commit ac10501131
12 changed files with 1766 additions and 286 deletions

View File

@@ -1,2 +1,71 @@
def hello() -> str:
return "Hello from bibapi!"
from .sru import Api as _Api
from .sru import DNBSchema, HBZSchema, HebisSchema, KOBVSchema, OEVKSchema, SWBSchema
__all__ = [
"SWB",
"DNB",
"KOBV",
"HEBIS",
"OEVK",
"HBZ",
]
class SWB(_Api):
def __init__(self):
self.site = SWBSchema.NAME.value
self.url = SWBSchema.URL.value
self.prefix = SWBSchema.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)
class DNB(_Api):
def __init__(self):
self.site = DNBSchema.NAME.value
self.url = DNBSchema.URL.value
self.prefix = DNBSchema.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)
class KOBV(_Api):
def __init__(self):
self.site = KOBVSchema.NAME.value
self.url = KOBVSchema.URL.value
self.prefix = KOBVSchema.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)
class HEBIS(_Api):
def __init__(self):
self.site = HebisSchema.NAME.value
self.url = HebisSchema.URL.value
self.prefix = HebisSchema.ARGSCHEMA.value
self.replace = HebisSchema.REPLACE.value
super().__init__(self.site, self.url, self.prefix, self.replace)
class OEVK(_Api):
def __init__(self):
self.site = OEVKSchema.NAME.value
self.url = OEVKSchema.URL.value
self.prefix = OEVKSchema.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)
class HBZ(_Api):
"""
Small wrapper of the SRU API used to retrieve data from the HBZ libraries
All fields are available [here](https://eu04.alma.exlibrisgroup.com/view/sru/49HBZ_NETWORK?version=1.2)
Schema
------
HBZSchema: <HBZSchema>
query prefix: alma.
"""
def __init__(self):
self.site = HBZSchema.NAME.value
self.url = HBZSchema.URL.value
self.prefix = HBZSchema.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)

View File

@@ -0,0 +1,80 @@
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlencode
BASE_URL = "https://kvk.bibliothek.kit.edu"
SEARCH_ENDPOINT = "/hylib-bin/kvk/nph-kvk2.cgi"
# parameters: search only in K10plus (GBV+SWB), do not embed full title or digital-only search
BASE_PARAMS = {
'digitalOnly': '0',
'embedFulltitle': '0',
'newTab': '0',
'mask': 'kvk-redesign',
'kataloge': 'K10PLUS',
'ACT': 'SRCHA',
}
def search_kvk(title: str, author: str, max_results: int = 10) -> list[str]:
"""Perform a title/author search in KVK and return full-record URLs (viewtitel links).
Args:
title: Exact title of the book.
author: Author name.
max_results: Number of search results to process.
Returns:
A list of absolute URLs to the full records in the K10plus catalogue.
"""
params = BASE_PARAMS.copy()
params.update({'TI': title, 'AU': author})
resp = requests.get(urljoin(BASE_URL, SEARCH_ENDPOINT), params=params, timeout=30)
resp.raise_for_status()
soup = BeautifulSoup(resp.content, "html.parser")
# KVK embeds links to the full records in anchor tags whose href contains 'view-titel'
links = []
for a in soup.find_all('a', href=True):
href = a['href']
if 'view-titel' in href:
links.append(urljoin(BASE_URL, href))
if len(links) >= max_results:
break
return links
def get_holdings(record_url: str) -> list[str]:
"""Extract the names of holding libraries from a K10plus record page."""
r = requests.get(record_url, timeout=30)
r.raise_for_status()
soup = BeautifulSoup(r.content, "html.parser")
holdings = []
# find the heading "Besitzende Bibliotheken" and then get all following anchor tags
heading = soup.find(lambda tag: tag.name in ['h2', 'h3', 'strong'] and 'Besitzende Bibliotheken' in tag.get_text())
if heading:
# the list of libraries is usually in an unordered list or series of <a> tags after the heading
for a in heading.find_next_all('a', href=True):
txt = a.get_text(strip=True)
if txt:
holdings.append(txt)
return holdings
def main():
title = "Java ist auch eine Insel"
author = "Ullenboom"
record_links = search_kvk(title, author, max_results=10)
for url in record_links:
print(f"Record: {url}")
libs = get_holdings(url)
if libs:
print(" Holding libraries:")
for lib in libs:
print(f" - {lib}")
else:
print(" No holdings found or unable to parse.")
print()
if __name__ == '__main__':
main()

359
src/bibapi/kvkparser.py Normal file
View File

@@ -0,0 +1,359 @@
import time
import re
import json
from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
from playwright.sync_api import (
Browser,
BrowserContext,
Page,
Playwright,
sync_playwright,
)
KVK_BASE_URL = "https://kvk.bibliothek.kit.edu/?kataloge=K10PLUS&kataloge=BVB&kataloge=NRW&kataloge=HEBIS&kataloge=HEBIS_RETRO&kataloge=KOBV_SOLR&kataloge=DDB&kataloge=STABI_BERLIN&kataloge=TIB&kataloge=OEVK_GBV&digitalOnly=0&embedFulltitle=0&newTab=0"
KVK_FORMATABLE_URL = "https://kvk.bibliothek.kit.edu/hylib-bin/kvk/nph-kvk2.cgi?maske=kvk-redesign&lang=de&title=KIT-Bibliothek%3A+Karlsruher+Virtueller+Katalog+KVK+%3A+Ergebnisanzeige&head=asset%2Fhtml%2Fhead.html&header=asset%2Fhtml%2Fheader.html&spacer=asset%2Fhtml%2Fspacer.html&footer=asset%2Fhtml%2Ffooter.html&input-charset=utf-8&ALL={freetext}&TI={title}&AU={person}&CI={ppl_s}&ST={sw}&PY={year}&SB={isbn}&SS={issn}&PU={publisher}&kataloge=K10PLUS&kataloge=BVB&kataloge=NRW&kataloge=HEBIS&kataloge=HEBIS_RETRO&kataloge=KOBV_SOLR&kataloge=DDB&kataloge=STABI_BERLIN&kataloge=TIB&kataloge=OEVK_GBV&ref=direct&client-js=no"
@dataclass
class Result:
title: str
link: str
@dataclass
class KVKResult:
source_katalogue: str
results: list[Result]
@dataclass
class KVKResults:
results: list["KVKResult"]
class KVKParser:
"""Playwright-backed KVK parser.
Usage:
p = KVKParser()
p.start() # starts Playwright and browser
html = p.search(title="My Title")
p.stop()
The instance exposes the live browser/context and helper methods so tests can reuse the browser.
"""
def __init__(
self,
headless: bool = False,
user_agent: Optional[str] = None,
storage_state: Optional[str] = None,
):
self._playwright: Optional[Playwright] = None
self._browser: Optional[Browser] = None
self._context: Optional[BrowserContext] = None
self._user_agent = user_agent
self._headless = headless
# Optional path to a storage_state file to load (cookies/localStorage)
self._storage_state = storage_state
def start(self) -> None:
"""Start Playwright and launch a browser/context."""
if self._playwright is not None:
return
self._playwright = sync_playwright().start()
# Launch with a few args to reduce automation detection surface
launch_args = [
"--disable-features=IsolateOrigins,site-per-process",
"--disable-blink-features=AutomationControlled",
]
self._browser = self._playwright.chromium.launch(
headless=self._headless, args=launch_args
)
context_options = {}
if self._user_agent:
context_options["user_agent"] = self._user_agent
# set a common locale to match site expectations
context_options.setdefault("locale", "de-DE")
if self._storage_state:
# load storage state (path or dict supported by Playwright)
context_options["storage_state"] = self._storage_state
self._context = self._browser.new_context(**context_options)
# Inject stealth-like script to reduce navigator.webdriver and other signals
stealth_script = """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'plugins', {get: () => [1,2,3,4,5]});
Object.defineProperty(navigator, 'languages', {get: () => ['de-DE','de']});
window.chrome = window.chrome || { runtime: {} };
"""
try:
# type: ignore[attr-defined]
self._context.add_init_script(stealth_script)
except Exception:
# Non-fatal: continue without stealth script
pass
# --- persistence & debugging helpers ---
def save_storage(self, path: str) -> None:
"""Save the current context storage_state (cookies/localStorage) to `path`.
Use this after solving a challenge manually in headful mode so subsequent runs can reuse the solved session.
"""
if self._context is None:
raise RuntimeError("KVKParser not started; call start() first")
try:
# Playwright allows saving directly to a file
self._context.storage_state(path=path)
except Exception:
raise
def screenshot(self, page: Page, path: str) -> None:
"""Take a screenshot of `page` to `path` (PNG)."""
page.screenshot(path=path)
def evaluate(self, page: Page, expression: str):
"""Evaluate JS `expression` in page context and return result."""
return page.evaluate(expression)
def stop(self) -> None:
"""Close context, browser and stop Playwright."""
if self._context:
try:
self._context.close()
except Exception:
pass
self._context = None
if self._browser:
try:
self._browser.close()
except Exception:
pass
self._browser = None
if self._playwright:
try:
self._playwright.stop()
except Exception:
pass
self._playwright = None
# --- helpers to access browser objects ---
def context(self) -> BrowserContext:
if self._context is None:
raise RuntimeError("KVKParser not started; call start() first")
return self._context
def new_page(self) -> Page:
return self.context().new_page()
def page_content(self, page: Page) -> str:
return page.content()
# --- core search helpers ---
def _build_query_url(
self,
freetext: str = "",
title: str = "",
author: str = "",
koreperschaft: str = "",
schlagwort: str = "",
year: str = "",
isbn: str = "",
issn: str = "",
verlag: str = "",
) -> str:
return KVK_FORMATABLE_URL.format(
freetext=freetext,
title=title,
person=author,
ppl_s=koreperschaft,
sw=schlagwort,
year=year,
isbn=isbn,
issn=issn,
publisher=verlag,
)
def search(self, wait_for_selector: Optional[str] = None, **kwargs) -> KVKResults:
"""Perform a search and return the page HTML.
Parameters:
wait_for_selector: optional CSS selector to wait for before returning content
kwargs: same as parameters to _build_query_url (title, author, isbn, ...)
"""
if self._context is None:
raise RuntimeError("KVKParser not started; call start() first")
url = self._build_query_url(**kwargs)
page = self._context.new_page()
try:
# Go to the page and let client-side JS run to solve any challenges
page.goto(url, wait_until="networkidle", timeout=30000)
# If caller provided a selector, wait for it. Otherwise try to wait for
# any loading-overlay to disappear which the fast_challenge page shows.
if wait_for_selector:
page.wait_for_selector(wait_for_selector, timeout=20000)
else:
# Many challenge pages show a loading overlay; wait for it to go away.
try:
page.wait_for_selector(
".loading-overlay", state="hidden", timeout=15000
)
except Exception:
try:
page.wait_for_selector(
".loading-overlay", state="detached", timeout=15000
)
except Exception:
# If still present, fall back to a short sleep to allow challenge to finish
try:
self._context.wait_for_event("page", timeout=1000)
except Exception:
pass
content = page.content()
# Heuristic: if page still looks like the fast_challenge loader, surface helpful message
if "fast_challenge" in content or "loading-overlay" in content:
# return content (caller can inspect) but also raise an informative exception
raise RuntimeError(
"Page contains fast_challenge overlay — try running with headful browser or adjust stealth options"
)
return self.evaluated(content)
finally:
try:
page.close()
except Exception:
pass
def check_result_libraries(self, results: KVKResults):
"""Check which libraries hold the results in the provided KVKResults.
Returns a dict mapping library names to counts of results held.
"""
library_counts = {}
for kvk_result in results.results:
print("Checking katalogue:", kvk_result.source_katalogue)
test_page = self.new_page()
for result in kvk_result.results:
print(" Checking result:", result.title, result.link)
try:
test_page.goto(result.link, wait_until="networkidle", timeout=20000)
# Try to wait for catalog-specific result containers to appear.
try:
test_page.wait_for_selector(
".kvk-result-item, .kvk-result-box, .search-results, .record, table",
timeout=5000,
)
# trigger lazy loading
try:
test_page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
except Exception:
pass
test_page.wait_for_timeout(1000)
html = test_page.content()
# If the page uses the kvk-result-box structure, reuse evaluated().
soup = BeautifulSoup(html, "html.parser")
if soup.select_one(".kvk-result-box"):
parsed = self.evaluated(html)
else:
# Try to parse a table-based result listing
table = soup.find("table")
results_list = []
if table:
for tr in table.find_all("tr"):
# prefer links in the row
a = tr.find("a")
if a and a.get("href"):
title = a.get_text(strip=True)
href = a.get("href")
else:
# fallback: join cell texts
cells = [td.get_text(strip=True) for td in tr.find_all(["td", "th"])]
if not cells:
continue
title = cells[0]
href = ""
results_list.append(Result(title=title, link=href))
parsed = KVKResults(results=[KVKResult(source_katalogue=kvk_result.source_katalogue, results=results_list)])
library_counts.setdefault(kvk_result.source_katalogue, 0)
library_counts[kvk_result.source_katalogue] += len(parsed.results)
continue
except Exception:
# selector didn't appear quickly — try other approaches
pass
# Inspect inline scripts for embedded JSON-like payloads
scripts = test_page.query_selector_all("script")
found = False
for s in scripts:
try:
txt = s.text_content() or ""
except Exception:
txt = ""
# look for a window.<NAME> = { ... } or var NAME = { ... } pattern
m = re.search(r"window\.[A-Za-z0-9_]+\s*=\s*(\{[\s\S]*?\})\s*;", txt)
if not m:
m = re.search(r"var\s+[A-Za-z0-9_]+\s*=\s*(\{[\s\S]*?\})\s*;", txt)
if m:
blob = m.group(1)
try:
obj = json.loads(blob)
if isinstance(obj, dict) and "results" in obj and isinstance(obj["results"], list):
count = len(obj["results"])
elif isinstance(obj, list):
count = len(obj)
else:
count = 1
library_counts.setdefault(kvk_result.source_katalogue, 0)
library_counts[kvk_result.source_katalogue] += count
found = True
break
except Exception:
# Not JSON — continue searching
continue
if found:
continue
# Fallback: small wait, then parse the full HTML
test_page.wait_for_timeout(1000)
html = test_page.content()
parsed = self.evaluated(html)
library_counts.setdefault(kvk_result.source_katalogue, 0)
library_counts[kvk_result.source_katalogue] += len(parsed.results)
except Exception as exc:
print(f"Error checking {result.link}: {exc}")
return library_counts
def evaluated(self, content: str):
resultlist = []
# map the content to KVKResult structure. Results are in the div with class "kvk-result-box". the katalogue title is in div kvk-result-head a href text
soup = BeautifulSoup(content, "html.parser")
for result_box in soup.select(".kvk-result-box"):
katalogue_title = result_box.select_one(".kvk-result-head").text.strip()
results = []
# results are in div kvk-result-list, subdiv kvk-result-item contains the links to the results, which are a href
for record in result_box.find_all("div", class_="kvk-result-item"):
link = record.find("a", class_="kvk-result-item-link")
link = link["href"]
title_elem = record.find("a", class_="kvk-result-item-link")
title = title_elem.text.strip().split("\n")[0].strip()
results.append(Result(title=title, link=link))
resultlist.append(
KVKResult(source_katalogue=katalogue_title, results=results)
)
return KVKResults(results=resultlist)
__all__ = ["KVKParser"]

View File

@@ -1,7 +1,7 @@
import re
import xml.etree.ElementTree as ET
from enum import Enum
from typing import Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
import requests
from requests.adapters import HTTPAdapter
@@ -366,43 +366,74 @@ def book_from_marc(rec: MarcRecord) -> BookData:
)
class PicaSchema(Enum):
TITLE = "pica.tit"
CALLSIGN = "pica.abr"
ALL = "pica.all"
DATE_FIRST_CREATION = "pica.ser"
DATE_LAST_MODIFIED = "pica.aed"
ISBN = "pica.isb"
ISSN = "pica.isn"
ISMN = "pica.ism"
PPN = "pica.ppn"
AUTHOR = "pica.per"
class ALMASchema(Enum):
pass
class DublinCoreSchema(Enum):
pass
class CQLSchema(Enum):
pass
class SWBSchema(Enum):
URL = "https://sru.k10plus.de/opac-de-627!rec=1?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=marcxml"
ARGSCHEMA = "pica."
ARGSCHEMA = PicaSchema
NAME = "SWB"
class DNBSchema(Enum):
URL = "https://services.dnb.de/sru/dnb?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=MARC21-xml"
ARGSCHEMA = ""
ARGSCHEMA = DublinCoreSchema
NAME = "DNB"
class KOBVSchema(Enum):
URL = "https://sru.kobv.de/k2?version=1.1&operation=searchRetrieve&query={}&startRecord=1&maximumRecords=100&recordSchema=marcxml"
ARGSCHEMA = "dc."
ARGSCHEMA = DublinCoreSchema
NAME = "KOBV"
class HebisSchema(Enum):
URL = "http://sru.hebis.de/sru/DB=2.1?query={}&version=1.1&operation=searchRetrieve&stylesheet=http%3A%2F%2Fsru.hebis.de%2Fsru%2F%3Fxsl%3DsearchRetrieveResponse&recordSchema=marc21&maximumRecords=100&startRecord=1&recordPacking=xml&sortKeys=LST_Y%2Cpica%2C0%2C%2C"
ARGSCHEMA = "pica."
ARGSCHEMA = PicaSchema
NAME = "HEBIS"
REPLACE = {" ": "+", "&": "%26", "=": "+%3D+"}
class OEVKSchema(Enum):
URL = "https://sru.k10plus.de/opac-de-627-2?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=marcxml"
ARGSCHEMA = "pica."
ARGSCHEMA = PicaSchema
NAME = "OEVK"
class HBZSchema(Enum):
URL = "https://eu04.alma.exlibrisgroup.com/view/sru/49HBZ_NETWORK?version=1.2&operation=searchRetrieve&recordSchema=marcxml&query={}&maximumRecords=100&recordSchema=marcxml"
ARGSCHEMA = "alma."
ARGSCHEMA = ALMASchema
NAME = "HBZ"
class ArgumentSchema(Enum):
TITLE = (
"title",
"tit",
)
RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK notations
@@ -506,7 +537,32 @@ def find_newer_edition(
return [best] if best else None
class _Api:
class QueryTransformer:
def __init__(
self, api_schema: Type[PicaSchema], arguments: Union[Iterable[str], str]
):
self.api_schema = api_schema
if isinstance(arguments, str):
self.arguments = [arguments]
else:
self.arguments = arguments
def transform(self) -> Dict[str, Any]:
arguments: List[str] = []
schema = self.api_schema
print(schema.TITLE.name)
for arg in self.arguments:
if "=" not in arg:
continue
key, value = arg.split("=", 1)
if hasattr(schema, key.upper()):
api_key = getattr(schema, key.upper()).value
arguments.append(f"{api_key}={value}")
return arguments
class Api:
def __init__(
self, site: str, url: str, prefix: str, replace: Optional[Dict[str, str]] = None
):
@@ -585,99 +641,5 @@ class _Api:
# Not implemented: depends on catalog front-end; return empty string for now
return ""
class SWB(_Api):
def __init__(self):
self.site = SWBSchema.NAME.value
self.url = SWBSchema.URL.value
self.prefix = SWBSchema.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)
class DNB(_Api):
def __init__(self):
self.site = DNBSchema.NAME.value
self.url = DNBSchema.URL.value
self.prefix = DNBSchema.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)
class KOBV(_Api):
def __init__(self):
self.site = KOBVSchema.NAME.value
self.url = KOBVSchema.URL.value
self.prefix = KOBVSchema.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)
class HEBIS(_Api):
def __init__(self):
self.site = HebisSchema.NAME.value
self.url = HebisSchema.URL.value
self.prefix = HebisSchema.ARGSCHEMA.value
self.replace = HebisSchema.REPLACE.value
super().__init__(self.site, self.url, self.prefix, self.replace)
class OEVK(_Api):
def __init__(self):
self.site = OEVKSchema.NAME.value
self.url = OEVKSchema.URL.value
self.prefix = OEVKSchema.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)
class HBZ(_Api):
"""
Small wrapper of the SRU API used to retrieve data from the HBZ libraries
All fields are available [here](https://eu04.alma.exlibrisgroup.com/view/sru/49HBZ_NETWORK?version=1.2)
Schema
------
HBZSchema: <HBZSchema>
query prefix: alma.
"""
def __init__(self):
self.site = HBZSchema.NAME.value
self.url = HBZSchema.URL.value
self.prefix = HBZSchema.ARGSCHEMA.value
super().__init__(self.site, self.url, self.prefix)
def search(self, query_args: Union[Iterable[str], str]):
arguments =
# async KVK class:
class KVK:
def __init__(self):
self.k10plus = SWB()
self.dnb = DNB()
self.hebis = HEBIS()
self.oevk = OEVK()
self.hbz = HBZ()
self.kobv = KOBV()
def close(self):
self.k10plus.close()
self.dnb.close()
self.hebis.close()
self.oevk.close()
self.hbz.close()
self.kobv.close()
def __del__(self):
self.close()
# async def get_all(self, query_args: Union[Iterable[str], str]) -> Dict[str, List[BookData]]:
async def get_all(
self, query_args: Union[Iterable[str], str]
) -> Dict[str, List[BookData]]:
results = {}
results["K10Plus"] = self.k10plus.getBooks(query_args)
results["DNB"] = self.dnb.getBooks(query_args)
results["HEBIS"] = self.hebis.getBooks(query_args)
results["OEVK"] = self.oevk.getBooks(query_args)
results["HBZ"] = self.hbz.getBooks(query_args)
results["KOBV"] = self.kobv.getBooks(query_args)
return results
# def search(self, query_args: Union[Iterable[str], str]):
# arguments =