diff --git a/src/bibapi/kvk_test_parser.py b/src/bibapi/kvk_test_parser.py deleted file mode 100644 index d34780d..0000000 --- a/src/bibapi/kvk_test_parser.py +++ /dev/null @@ -1,80 +0,0 @@ -import requests -from bs4 import BeautifulSoup -from urllib.parse import urljoin, urlencode - -BASE_URL = "https://kvk.bibliothek.kit.edu" -SEARCH_ENDPOINT = "/hylib-bin/kvk/nph-kvk2.cgi" - -# parameters: search only in K10plus (GBV+SWB), do not embed full title or digital-only search -BASE_PARAMS = { - 'digitalOnly': '0', - 'embedFulltitle': '0', - 'newTab': '0', - 'mask': 'kvk-redesign', - 'kataloge': 'K10PLUS', - 'ACT': 'SRCHA', -} - - -def search_kvk(title: str, author: str, max_results: int = 10) -> list[str]: - """Perform a title/author search in KVK and return full-record URLs (view‑titel links). - - Args: - title: Exact title of the book. - author: Author name. - max_results: Number of search results to process. - Returns: - A list of absolute URLs to the full records in the K10plus catalogue. - """ - params = BASE_PARAMS.copy() - params.update({'TI': title, 'AU': author}) - resp = requests.get(urljoin(BASE_URL, SEARCH_ENDPOINT), params=params, timeout=30) - resp.raise_for_status() - soup = BeautifulSoup(resp.content, "html.parser") - - # KVK embeds links to the full records in anchor tags whose href contains 'view-titel' - links = [] - for a in soup.find_all('a', href=True): - href = a['href'] - if 'view-titel' in href: - links.append(urljoin(BASE_URL, href)) - if len(links) >= max_results: - break - return links - - -def get_holdings(record_url: str) -> list[str]: - """Extract the names of holding libraries from a K10plus record page.""" - r = requests.get(record_url, timeout=30) - r.raise_for_status() - soup = BeautifulSoup(r.content, "html.parser") - holdings = [] - # find the heading "Besitzende Bibliotheken" and then get all following anchor tags - heading = soup.find(lambda tag: tag.name in ['h2', 'h3', 'strong'] and 'Besitzende Bibliotheken' in tag.get_text()) - if heading: - # the list of libraries is usually in an unordered list or series of tags after the heading - for a in heading.find_next_all('a', href=True): - txt = a.get_text(strip=True) - if txt: - holdings.append(txt) - return holdings - - -def main(): - title = "Java ist auch eine Insel" - author = "Ullenboom" - record_links = search_kvk(title, author, max_results=10) - for url in record_links: - print(f"Record: {url}") - libs = get_holdings(url) - if libs: - print(" Holding libraries:") - for lib in libs: - print(f" - {lib}") - else: - print(" No holdings found or unable to parse.") - print() - - -if __name__ == '__main__': - main() diff --git a/src/bibapi/kvkparser.py b/src/bibapi/kvkparser.py deleted file mode 100644 index 689b019..0000000 --- a/src/bibapi/kvkparser.py +++ /dev/null @@ -1,359 +0,0 @@ -import time -import re -import json -from dataclasses import dataclass -from typing import Optional - -from bs4 import BeautifulSoup -from playwright.sync_api import ( - Browser, - BrowserContext, - Page, - Playwright, - sync_playwright, -) - -KVK_BASE_URL = "https://kvk.bibliothek.kit.edu/?kataloge=K10PLUS&kataloge=BVB&kataloge=NRW&kataloge=HEBIS&kataloge=HEBIS_RETRO&kataloge=KOBV_SOLR&kataloge=DDB&kataloge=STABI_BERLIN&kataloge=TIB&kataloge=OEVK_GBV&digitalOnly=0&embedFulltitle=0&newTab=0" - -KVK_FORMATABLE_URL = "https://kvk.bibliothek.kit.edu/hylib-bin/kvk/nph-kvk2.cgi?maske=kvk-redesign&lang=de&title=KIT-Bibliothek%3A+Karlsruher+Virtueller+Katalog+KVK+%3A+Ergebnisanzeige&head=asset%2Fhtml%2Fhead.html&header=asset%2Fhtml%2Fheader.html&spacer=asset%2Fhtml%2Fspacer.html&footer=asset%2Fhtml%2Ffooter.html&input-charset=utf-8&ALL={freetext}&TI={title}&AU={person}&CI={ppl_s}&ST={sw}&PY={year}&SB={isbn}&SS={issn}&PU={publisher}&kataloge=K10PLUS&kataloge=BVB&kataloge=NRW&kataloge=HEBIS&kataloge=HEBIS_RETRO&kataloge=KOBV_SOLR&kataloge=DDB&kataloge=STABI_BERLIN&kataloge=TIB&kataloge=OEVK_GBV&ref=direct&client-js=no" - - -@dataclass -class Result: - title: str - link: str - - -@dataclass -class KVKResult: - source_katalogue: str - results: list[Result] - - -@dataclass -class KVKResults: - results: list["KVKResult"] - - -class KVKParser: - """Playwright-backed KVK parser. - - Usage: - p = KVKParser() - p.start() # starts Playwright and browser - html = p.search(title="My Title") - p.stop() - - The instance exposes the live browser/context and helper methods so tests can reuse the browser. - """ - - def __init__( - self, - headless: bool = False, - user_agent: Optional[str] = None, - storage_state: Optional[str] = None, - ): - self._playwright: Optional[Playwright] = None - self._browser: Optional[Browser] = None - self._context: Optional[BrowserContext] = None - self._user_agent = user_agent - self._headless = headless - # Optional path to a storage_state file to load (cookies/localStorage) - self._storage_state = storage_state - - def start(self) -> None: - """Start Playwright and launch a browser/context.""" - if self._playwright is not None: - return - self._playwright = sync_playwright().start() - # Launch with a few args to reduce automation detection surface - launch_args = [ - "--disable-features=IsolateOrigins,site-per-process", - "--disable-blink-features=AutomationControlled", - ] - self._browser = self._playwright.chromium.launch( - headless=self._headless, args=launch_args - ) - - context_options = {} - if self._user_agent: - context_options["user_agent"] = self._user_agent - # set a common locale to match site expectations - context_options.setdefault("locale", "de-DE") - if self._storage_state: - # load storage state (path or dict supported by Playwright) - context_options["storage_state"] = self._storage_state - - self._context = self._browser.new_context(**context_options) - - # Inject stealth-like script to reduce navigator.webdriver and other signals - stealth_script = """ - Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); - Object.defineProperty(navigator, 'plugins', {get: () => [1,2,3,4,5]}); - Object.defineProperty(navigator, 'languages', {get: () => ['de-DE','de']}); - window.chrome = window.chrome || { runtime: {} }; - """ - try: - # type: ignore[attr-defined] - self._context.add_init_script(stealth_script) - except Exception: - # Non-fatal: continue without stealth script - pass - - # --- persistence & debugging helpers --- - def save_storage(self, path: str) -> None: - """Save the current context storage_state (cookies/localStorage) to `path`. - - Use this after solving a challenge manually in headful mode so subsequent runs can reuse the solved session. - """ - if self._context is None: - raise RuntimeError("KVKParser not started; call start() first") - try: - # Playwright allows saving directly to a file - self._context.storage_state(path=path) - except Exception: - raise - - def screenshot(self, page: Page, path: str) -> None: - """Take a screenshot of `page` to `path` (PNG).""" - page.screenshot(path=path) - - def evaluate(self, page: Page, expression: str): - """Evaluate JS `expression` in page context and return result.""" - return page.evaluate(expression) - - def stop(self) -> None: - """Close context, browser and stop Playwright.""" - if self._context: - try: - self._context.close() - except Exception: - pass - self._context = None - if self._browser: - try: - self._browser.close() - except Exception: - pass - self._browser = None - if self._playwright: - try: - self._playwright.stop() - except Exception: - pass - self._playwright = None - - # --- helpers to access browser objects --- - def context(self) -> BrowserContext: - if self._context is None: - raise RuntimeError("KVKParser not started; call start() first") - return self._context - - def new_page(self) -> Page: - return self.context().new_page() - - def page_content(self, page: Page) -> str: - return page.content() - - # --- core search helpers --- - def _build_query_url( - self, - freetext: str = "", - title: str = "", - author: str = "", - koreperschaft: str = "", - schlagwort: str = "", - year: str = "", - isbn: str = "", - issn: str = "", - verlag: str = "", - ) -> str: - return KVK_FORMATABLE_URL.format( - freetext=freetext, - title=title, - person=author, - ppl_s=koreperschaft, - sw=schlagwort, - year=year, - isbn=isbn, - issn=issn, - publisher=verlag, - ) - - def search(self, wait_for_selector: Optional[str] = None, **kwargs) -> KVKResults: - """Perform a search and return the page HTML. - - Parameters: - wait_for_selector: optional CSS selector to wait for before returning content - kwargs: same as parameters to _build_query_url (title, author, isbn, ...) - """ - if self._context is None: - raise RuntimeError("KVKParser not started; call start() first") - url = self._build_query_url(**kwargs) - page = self._context.new_page() - try: - # Go to the page and let client-side JS run to solve any challenges - page.goto(url, wait_until="networkidle", timeout=30000) - - # If caller provided a selector, wait for it. Otherwise try to wait for - # any loading-overlay to disappear which the fast_challenge page shows. - if wait_for_selector: - page.wait_for_selector(wait_for_selector, timeout=20000) - else: - # Many challenge pages show a loading overlay; wait for it to go away. - try: - page.wait_for_selector( - ".loading-overlay", state="hidden", timeout=15000 - ) - except Exception: - try: - page.wait_for_selector( - ".loading-overlay", state="detached", timeout=15000 - ) - except Exception: - # If still present, fall back to a short sleep to allow challenge to finish - try: - self._context.wait_for_event("page", timeout=1000) - except Exception: - pass - - content = page.content() - - # Heuristic: if page still looks like the fast_challenge loader, surface helpful message - if "fast_challenge" in content or "loading-overlay" in content: - # return content (caller can inspect) but also raise an informative exception - raise RuntimeError( - "Page contains fast_challenge overlay — try running with headful browser or adjust stealth options" - ) - - return self.evaluated(content) - finally: - try: - page.close() - except Exception: - pass - - def check_result_libraries(self, results: KVKResults): - """Check which libraries hold the results in the provided KVKResults. - - Returns a dict mapping library names to counts of results held. - """ - library_counts = {} - for kvk_result in results.results: - print("Checking katalogue:", kvk_result.source_katalogue) - test_page = self.new_page() - for result in kvk_result.results: - print(" Checking result:", result.title, result.link) - try: - test_page.goto(result.link, wait_until="networkidle", timeout=20000) - - # Try to wait for catalog-specific result containers to appear. - try: - test_page.wait_for_selector( - ".kvk-result-item, .kvk-result-box, .search-results, .record, table", - timeout=5000, - ) - # trigger lazy loading - try: - test_page.evaluate("window.scrollTo(0, document.body.scrollHeight);") - except Exception: - pass - test_page.wait_for_timeout(1000) - html = test_page.content() - # If the page uses the kvk-result-box structure, reuse evaluated(). - soup = BeautifulSoup(html, "html.parser") - if soup.select_one(".kvk-result-box"): - parsed = self.evaluated(html) - else: - # Try to parse a table-based result listing - table = soup.find("table") - results_list = [] - if table: - for tr in table.find_all("tr"): - # prefer links in the row - a = tr.find("a") - if a and a.get("href"): - title = a.get_text(strip=True) - href = a.get("href") - else: - # fallback: join cell texts - cells = [td.get_text(strip=True) for td in tr.find_all(["td", "th"])] - if not cells: - continue - title = cells[0] - href = "" - results_list.append(Result(title=title, link=href)) - parsed = KVKResults(results=[KVKResult(source_katalogue=kvk_result.source_katalogue, results=results_list)]) - library_counts.setdefault(kvk_result.source_katalogue, 0) - library_counts[kvk_result.source_katalogue] += len(parsed.results) - continue - except Exception: - # selector didn't appear quickly — try other approaches - pass - - # Inspect inline scripts for embedded JSON-like payloads - scripts = test_page.query_selector_all("script") - found = False - for s in scripts: - try: - txt = s.text_content() or "" - except Exception: - txt = "" - # look for a window. = { ... } or var NAME = { ... } pattern - m = re.search(r"window\.[A-Za-z0-9_]+\s*=\s*(\{[\s\S]*?\})\s*;", txt) - if not m: - m = re.search(r"var\s+[A-Za-z0-9_]+\s*=\s*(\{[\s\S]*?\})\s*;", txt) - if m: - blob = m.group(1) - try: - obj = json.loads(blob) - if isinstance(obj, dict) and "results" in obj and isinstance(obj["results"], list): - count = len(obj["results"]) - elif isinstance(obj, list): - count = len(obj) - else: - count = 1 - library_counts.setdefault(kvk_result.source_katalogue, 0) - library_counts[kvk_result.source_katalogue] += count - found = True - break - except Exception: - # Not JSON — continue searching - continue - - if found: - continue - - # Fallback: small wait, then parse the full HTML - test_page.wait_for_timeout(1000) - html = test_page.content() - parsed = self.evaluated(html) - library_counts.setdefault(kvk_result.source_katalogue, 0) - library_counts[kvk_result.source_katalogue] += len(parsed.results) - except Exception as exc: - print(f"Error checking {result.link}: {exc}") - - return library_counts - - def evaluated(self, content: str): - resultlist = [] - # map the content to KVKResult structure. Results are in the div with class "kvk-result-box". the katalogue title is in div kvk-result-head a href text - soup = BeautifulSoup(content, "html.parser") - for result_box in soup.select(".kvk-result-box"): - katalogue_title = result_box.select_one(".kvk-result-head").text.strip() - results = [] - # results are in div kvk-result-list, subdiv kvk-result-item contains the links to the results, which are a href - - for record in result_box.find_all("div", class_="kvk-result-item"): - link = record.find("a", class_="kvk-result-item-link") - link = link["href"] - title_elem = record.find("a", class_="kvk-result-item-link") - title = title_elem.text.strip().split("\n")[0].strip() - results.append(Result(title=title, link=link)) - resultlist.append( - KVKResult(source_katalogue=katalogue_title, results=results) - ) - return KVKResults(results=resultlist) - - -__all__ = ["KVKParser"]