dev #6

Merged
WorldTeacher merged 15 commits from dev into main 2025-11-13 09:37:37 +00:00
2 changed files with 0 additions and 439 deletions
Showing only changes of commit 026e39b9b8 - Show all commits

View File

@@ -1,80 +0,0 @@
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlencode
BASE_URL = "https://kvk.bibliothek.kit.edu"
SEARCH_ENDPOINT = "/hylib-bin/kvk/nph-kvk2.cgi"
# parameters: search only in K10plus (GBV+SWB), do not embed full title or digital-only search
BASE_PARAMS = {
'digitalOnly': '0',
'embedFulltitle': '0',
'newTab': '0',
'mask': 'kvk-redesign',
'kataloge': 'K10PLUS',
'ACT': 'SRCHA',
}
def search_kvk(title: str, author: str, max_results: int = 10) -> list[str]:
"""Perform a title/author search in KVK and return full-record URLs (viewtitel links).
Args:
title: Exact title of the book.
author: Author name.
max_results: Number of search results to process.
Returns:
A list of absolute URLs to the full records in the K10plus catalogue.
"""
params = BASE_PARAMS.copy()
params.update({'TI': title, 'AU': author})
resp = requests.get(urljoin(BASE_URL, SEARCH_ENDPOINT), params=params, timeout=30)
resp.raise_for_status()
soup = BeautifulSoup(resp.content, "html.parser")
# KVK embeds links to the full records in anchor tags whose href contains 'view-titel'
links = []
for a in soup.find_all('a', href=True):
href = a['href']
if 'view-titel' in href:
links.append(urljoin(BASE_URL, href))
if len(links) >= max_results:
break
return links
def get_holdings(record_url: str) -> list[str]:
"""Extract the names of holding libraries from a K10plus record page."""
r = requests.get(record_url, timeout=30)
r.raise_for_status()
soup = BeautifulSoup(r.content, "html.parser")
holdings = []
# find the heading "Besitzende Bibliotheken" and then get all following anchor tags
heading = soup.find(lambda tag: tag.name in ['h2', 'h3', 'strong'] and 'Besitzende Bibliotheken' in tag.get_text())
if heading:
# the list of libraries is usually in an unordered list or series of <a> tags after the heading
for a in heading.find_next_all('a', href=True):
txt = a.get_text(strip=True)
if txt:
holdings.append(txt)
return holdings
def main():
title = "Java ist auch eine Insel"
author = "Ullenboom"
record_links = search_kvk(title, author, max_results=10)
for url in record_links:
print(f"Record: {url}")
libs = get_holdings(url)
if libs:
print(" Holding libraries:")
for lib in libs:
print(f" - {lib}")
else:
print(" No holdings found or unable to parse.")
print()
if __name__ == '__main__':
main()

View File

@@ -1,359 +0,0 @@
import time
import re
import json
from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
from playwright.sync_api import (
Browser,
BrowserContext,
Page,
Playwright,
sync_playwright,
)
KVK_BASE_URL = "https://kvk.bibliothek.kit.edu/?kataloge=K10PLUS&kataloge=BVB&kataloge=NRW&kataloge=HEBIS&kataloge=HEBIS_RETRO&kataloge=KOBV_SOLR&kataloge=DDB&kataloge=STABI_BERLIN&kataloge=TIB&kataloge=OEVK_GBV&digitalOnly=0&embedFulltitle=0&newTab=0"
KVK_FORMATABLE_URL = "https://kvk.bibliothek.kit.edu/hylib-bin/kvk/nph-kvk2.cgi?maske=kvk-redesign&lang=de&title=KIT-Bibliothek%3A+Karlsruher+Virtueller+Katalog+KVK+%3A+Ergebnisanzeige&head=asset%2Fhtml%2Fhead.html&header=asset%2Fhtml%2Fheader.html&spacer=asset%2Fhtml%2Fspacer.html&footer=asset%2Fhtml%2Ffooter.html&input-charset=utf-8&ALL={freetext}&TI={title}&AU={person}&CI={ppl_s}&ST={sw}&PY={year}&SB={isbn}&SS={issn}&PU={publisher}&kataloge=K10PLUS&kataloge=BVB&kataloge=NRW&kataloge=HEBIS&kataloge=HEBIS_RETRO&kataloge=KOBV_SOLR&kataloge=DDB&kataloge=STABI_BERLIN&kataloge=TIB&kataloge=OEVK_GBV&ref=direct&client-js=no"
@dataclass
class Result:
title: str
link: str
@dataclass
class KVKResult:
source_katalogue: str
results: list[Result]
@dataclass
class KVKResults:
results: list["KVKResult"]
class KVKParser:
"""Playwright-backed KVK parser.
Usage:
p = KVKParser()
p.start() # starts Playwright and browser
html = p.search(title="My Title")
p.stop()
The instance exposes the live browser/context and helper methods so tests can reuse the browser.
"""
def __init__(
self,
headless: bool = False,
user_agent: Optional[str] = None,
storage_state: Optional[str] = None,
):
self._playwright: Optional[Playwright] = None
self._browser: Optional[Browser] = None
self._context: Optional[BrowserContext] = None
self._user_agent = user_agent
self._headless = headless
# Optional path to a storage_state file to load (cookies/localStorage)
self._storage_state = storage_state
def start(self) -> None:
"""Start Playwright and launch a browser/context."""
if self._playwright is not None:
return
self._playwright = sync_playwright().start()
# Launch with a few args to reduce automation detection surface
launch_args = [
"--disable-features=IsolateOrigins,site-per-process",
"--disable-blink-features=AutomationControlled",
]
self._browser = self._playwright.chromium.launch(
headless=self._headless, args=launch_args
)
context_options = {}
if self._user_agent:
context_options["user_agent"] = self._user_agent
# set a common locale to match site expectations
context_options.setdefault("locale", "de-DE")
if self._storage_state:
# load storage state (path or dict supported by Playwright)
context_options["storage_state"] = self._storage_state
self._context = self._browser.new_context(**context_options)
# Inject stealth-like script to reduce navigator.webdriver and other signals
stealth_script = """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'plugins', {get: () => [1,2,3,4,5]});
Object.defineProperty(navigator, 'languages', {get: () => ['de-DE','de']});
window.chrome = window.chrome || { runtime: {} };
"""
try:
# type: ignore[attr-defined]
self._context.add_init_script(stealth_script)
except Exception:
# Non-fatal: continue without stealth script
pass
# --- persistence & debugging helpers ---
def save_storage(self, path: str) -> None:
"""Save the current context storage_state (cookies/localStorage) to `path`.
Use this after solving a challenge manually in headful mode so subsequent runs can reuse the solved session.
"""
if self._context is None:
raise RuntimeError("KVKParser not started; call start() first")
try:
# Playwright allows saving directly to a file
self._context.storage_state(path=path)
except Exception:
raise
def screenshot(self, page: Page, path: str) -> None:
"""Take a screenshot of `page` to `path` (PNG)."""
page.screenshot(path=path)
def evaluate(self, page: Page, expression: str):
"""Evaluate JS `expression` in page context and return result."""
return page.evaluate(expression)
def stop(self) -> None:
"""Close context, browser and stop Playwright."""
if self._context:
try:
self._context.close()
except Exception:
pass
self._context = None
if self._browser:
try:
self._browser.close()
except Exception:
pass
self._browser = None
if self._playwright:
try:
self._playwright.stop()
except Exception:
pass
self._playwright = None
# --- helpers to access browser objects ---
def context(self) -> BrowserContext:
if self._context is None:
raise RuntimeError("KVKParser not started; call start() first")
return self._context
def new_page(self) -> Page:
return self.context().new_page()
def page_content(self, page: Page) -> str:
return page.content()
# --- core search helpers ---
def _build_query_url(
self,
freetext: str = "",
title: str = "",
author: str = "",
koreperschaft: str = "",
schlagwort: str = "",
year: str = "",
isbn: str = "",
issn: str = "",
verlag: str = "",
) -> str:
return KVK_FORMATABLE_URL.format(
freetext=freetext,
title=title,
person=author,
ppl_s=koreperschaft,
sw=schlagwort,
year=year,
isbn=isbn,
issn=issn,
publisher=verlag,
)
def search(self, wait_for_selector: Optional[str] = None, **kwargs) -> KVKResults:
"""Perform a search and return the page HTML.
Parameters:
wait_for_selector: optional CSS selector to wait for before returning content
kwargs: same as parameters to _build_query_url (title, author, isbn, ...)
"""
if self._context is None:
raise RuntimeError("KVKParser not started; call start() first")
url = self._build_query_url(**kwargs)
page = self._context.new_page()
try:
# Go to the page and let client-side JS run to solve any challenges
page.goto(url, wait_until="networkidle", timeout=30000)
# If caller provided a selector, wait for it. Otherwise try to wait for
# any loading-overlay to disappear which the fast_challenge page shows.
if wait_for_selector:
page.wait_for_selector(wait_for_selector, timeout=20000)
else:
# Many challenge pages show a loading overlay; wait for it to go away.
try:
page.wait_for_selector(
".loading-overlay", state="hidden", timeout=15000
)
except Exception:
try:
page.wait_for_selector(
".loading-overlay", state="detached", timeout=15000
)
except Exception:
# If still present, fall back to a short sleep to allow challenge to finish
try:
self._context.wait_for_event("page", timeout=1000)
except Exception:
pass
content = page.content()
# Heuristic: if page still looks like the fast_challenge loader, surface helpful message
if "fast_challenge" in content or "loading-overlay" in content:
# return content (caller can inspect) but also raise an informative exception
raise RuntimeError(
"Page contains fast_challenge overlay — try running with headful browser or adjust stealth options"
)
return self.evaluated(content)
finally:
try:
page.close()
except Exception:
pass
def check_result_libraries(self, results: KVKResults):
"""Check which libraries hold the results in the provided KVKResults.
Returns a dict mapping library names to counts of results held.
"""
library_counts = {}
for kvk_result in results.results:
print("Checking katalogue:", kvk_result.source_katalogue)
test_page = self.new_page()
for result in kvk_result.results:
print(" Checking result:", result.title, result.link)
try:
test_page.goto(result.link, wait_until="networkidle", timeout=20000)
# Try to wait for catalog-specific result containers to appear.
try:
test_page.wait_for_selector(
".kvk-result-item, .kvk-result-box, .search-results, .record, table",
timeout=5000,
)
# trigger lazy loading
try:
test_page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
except Exception:
pass
test_page.wait_for_timeout(1000)
html = test_page.content()
# If the page uses the kvk-result-box structure, reuse evaluated().
soup = BeautifulSoup(html, "html.parser")
if soup.select_one(".kvk-result-box"):
parsed = self.evaluated(html)
else:
# Try to parse a table-based result listing
table = soup.find("table")
results_list = []
if table:
for tr in table.find_all("tr"):
# prefer links in the row
a = tr.find("a")
if a and a.get("href"):
title = a.get_text(strip=True)
href = a.get("href")
else:
# fallback: join cell texts
cells = [td.get_text(strip=True) for td in tr.find_all(["td", "th"])]
if not cells:
continue
title = cells[0]
href = ""
results_list.append(Result(title=title, link=href))
parsed = KVKResults(results=[KVKResult(source_katalogue=kvk_result.source_katalogue, results=results_list)])
library_counts.setdefault(kvk_result.source_katalogue, 0)
library_counts[kvk_result.source_katalogue] += len(parsed.results)
continue
except Exception:
# selector didn't appear quickly — try other approaches
pass
# Inspect inline scripts for embedded JSON-like payloads
scripts = test_page.query_selector_all("script")
found = False
for s in scripts:
try:
txt = s.text_content() or ""
except Exception:
txt = ""
# look for a window.<NAME> = { ... } or var NAME = { ... } pattern
m = re.search(r"window\.[A-Za-z0-9_]+\s*=\s*(\{[\s\S]*?\})\s*;", txt)
if not m:
m = re.search(r"var\s+[A-Za-z0-9_]+\s*=\s*(\{[\s\S]*?\})\s*;", txt)
if m:
blob = m.group(1)
try:
obj = json.loads(blob)
if isinstance(obj, dict) and "results" in obj and isinstance(obj["results"], list):
count = len(obj["results"])
elif isinstance(obj, list):
count = len(obj)
else:
count = 1
library_counts.setdefault(kvk_result.source_katalogue, 0)
library_counts[kvk_result.source_katalogue] += count
found = True
break
except Exception:
# Not JSON — continue searching
continue
if found:
continue
# Fallback: small wait, then parse the full HTML
test_page.wait_for_timeout(1000)
html = test_page.content()
parsed = self.evaluated(html)
library_counts.setdefault(kvk_result.source_katalogue, 0)
library_counts[kvk_result.source_katalogue] += len(parsed.results)
except Exception as exc:
print(f"Error checking {result.link}: {exc}")
return library_counts
def evaluated(self, content: str):
resultlist = []
# map the content to KVKResult structure. Results are in the div with class "kvk-result-box". the katalogue title is in div kvk-result-head a href text
soup = BeautifulSoup(content, "html.parser")
for result_box in soup.select(".kvk-result-box"):
katalogue_title = result_box.select_one(".kvk-result-head").text.strip()
results = []
# results are in div kvk-result-list, subdiv kvk-result-item contains the links to the results, which are a href
for record in result_box.find_all("div", class_="kvk-result-item"):
link = record.find("a", class_="kvk-result-item-link")
link = link["href"]
title_elem = record.find("a", class_="kvk-result-item-link")
title = title_elem.text.strip().split("\n")[0].strip()
results.append(Result(title=title, link=link))
resultlist.append(
KVKResult(source_katalogue=katalogue_title, results=results)
)
return KVKResults(results=resultlist)
__all__ = ["KVKParser"]