chore: restructured project, updated readme
This commit is contained in:
16
src/services/__init__.py
Normal file
16
src/services/__init__.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""External service integrations and API clients."""
|
||||
|
||||
from .catalogue import Catalogue
|
||||
from .sru import SWB
|
||||
from .lehmanns import LehmannsClient
|
||||
from .zotero import ZoteroController
|
||||
from .webrequest import BibTextTransformer, WebRequest
|
||||
|
||||
__all__ = [
|
||||
"Catalogue",
|
||||
"SWB",
|
||||
"LehmannsClient",
|
||||
"ZoteroController",
|
||||
"BibTextTransformer",
|
||||
"WebRequest",
|
||||
]
|
||||
292
src/services/catalogue.py
Normal file
292
src/services/catalogue.py
Normal file
@@ -0,0 +1,292 @@
|
||||
from typing import List
|
||||
|
||||
import regex
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from src.core.models import BookData as Book
|
||||
from src.shared.logging import log
|
||||
|
||||
URL = "https://rds.ibs-bw.de/phfreiburg/opac/RDSIndex/Search?type0%5B%5D=allfields&lookfor0%5B%5D={}&join=AND&bool0%5B%5D=AND&type0%5B%5D=au&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ti&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ct&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=isn&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ta&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=co&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=py&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pp&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pu&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=si&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=zr&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=cc&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND"
|
||||
BASE = "https://rds.ibs-bw.de"
|
||||
|
||||
|
||||
class Catalogue:
|
||||
def __init__(self, timeout=15):
|
||||
self.timeout = timeout
|
||||
reachable = self.check_connection()
|
||||
if not reachable:
|
||||
log.error("No internet connection available.")
|
||||
raise ConnectionError("No internet connection available.")
|
||||
|
||||
def check_connection(self):
|
||||
try:
|
||||
response = requests.get("https://www.google.com", timeout=self.timeout)
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error(f"Could not connect to google.com: {e}")
|
||||
|
||||
def search_book(self, searchterm: str):
|
||||
response = requests.get(URL.format(searchterm), timeout=self.timeout)
|
||||
return response.text
|
||||
|
||||
def search(self, link: str):
|
||||
response = requests.get(link, timeout=self.timeout)
|
||||
return response.text
|
||||
|
||||
def get_book_links(self, searchterm: str) -> List[str]:
|
||||
response = self.search_book(searchterm)
|
||||
soup = BeautifulSoup(response, "html.parser")
|
||||
links = soup.find_all("a", class_="title getFull")
|
||||
res: List[str] = []
|
||||
for link in links:
|
||||
res.append(BASE + link["href"]) # type: ignore
|
||||
return res
|
||||
|
||||
def get_book(self, searchterm: str):
|
||||
log.info(f"Searching for term: {searchterm}")
|
||||
|
||||
links = self.get_book_links(searchterm)
|
||||
print(links)
|
||||
for elink in links:
|
||||
result = self.search(elink)
|
||||
# in result search for class col-xs-12 rds-dl RDS_LOCATION
|
||||
# if found, return text of href
|
||||
soup = BeautifulSoup(result, "html.parser")
|
||||
|
||||
# Optional (unchanged): title and ppn if you need them
|
||||
title_el = soup.find("div", class_="headline text")
|
||||
title = title_el.get_text(strip=True) if title_el else None
|
||||
|
||||
ppn_el = soup.find(
|
||||
"div", class_="col-xs-12 col-md-5 col-lg-4 rds-dl-head RDS_PPN"
|
||||
)
|
||||
# in ppn_el, get text of div col-xs-12 col-md-7 col-lg-8 rds-dl-panel
|
||||
ppn = (
|
||||
ppn_el.find_next_sibling(
|
||||
"div", class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel"
|
||||
).get_text(strip=True)
|
||||
if ppn_el
|
||||
else None
|
||||
)
|
||||
|
||||
# get edition text at div class col-xs-12 col-md-5 col-lg-4 rds-dl-head RDS_EDITION
|
||||
edition_el = soup.find(
|
||||
"div", class_="col-xs-12 col-md-5 col-lg-4 rds-dl-head RDS_EDITION"
|
||||
)
|
||||
edition = (
|
||||
edition_el.find_next_sibling(
|
||||
"div", class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel"
|
||||
).get_text(strip=True)
|
||||
if edition_el
|
||||
else None
|
||||
)
|
||||
|
||||
authors = soup.find_all(
|
||||
"div", class_="col-xs-12 col-md-5 col-lg-4 rds-dl-head RDS_PERSON"
|
||||
)
|
||||
author = None
|
||||
if authors:
|
||||
# get the names of the a href links in the div col-xs-12 col-md-7 col-lg-8 rds-dl-panel
|
||||
author_names = []
|
||||
for author in authors:
|
||||
panel = author.find_next_sibling(
|
||||
"div", class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel"
|
||||
)
|
||||
if panel:
|
||||
links = panel.find_all("a")
|
||||
for link in links:
|
||||
author_names.append(link.text.strip())
|
||||
author = (
|
||||
";".join(author_names) if len(author_names) > 1 else author_names[0]
|
||||
)
|
||||
signature = None
|
||||
|
||||
panel = soup.select_one("div.panel-body")
|
||||
if panel:
|
||||
# Collect the RDS_* blocks in order, using the 'space' divs as separators
|
||||
groups = []
|
||||
cur = {}
|
||||
for node in panel.select(
|
||||
"div.rds-dl.RDS_SIGNATURE, div.rds-dl.RDS_STATUS, div.rds-dl.RDS_LOCATION, div.col-xs-12.space"
|
||||
):
|
||||
classes = node.get("class", [])
|
||||
# Separator between entries
|
||||
if "space" in classes:
|
||||
if cur:
|
||||
groups.append(cur)
|
||||
cur = {}
|
||||
continue
|
||||
|
||||
# Read the value from the corresponding panel cell
|
||||
val_el = node.select_one(".rds-dl-panel")
|
||||
val = (
|
||||
val_el.get_text(" ", strip=True)
|
||||
if val_el
|
||||
else node.get_text(" ", strip=True)
|
||||
)
|
||||
|
||||
if "RDS_SIGNATURE" in classes:
|
||||
cur["signature"] = val
|
||||
elif "RDS_STATUS" in classes:
|
||||
cur["status"] = val
|
||||
elif "RDS_LOCATION" in classes:
|
||||
cur["location"] = val
|
||||
|
||||
if cur: # append the last group if not followed by a space
|
||||
groups.append(cur)
|
||||
|
||||
# Find the signature for the entry whose location mentions "Semesterapparat"
|
||||
for g in groups:
|
||||
loc = g.get("location", "").lower()
|
||||
if "semesterapparat" in loc:
|
||||
signature = g.get("signature")
|
||||
return Book(
|
||||
title=title,
|
||||
ppn=ppn,
|
||||
signature=signature,
|
||||
library_location=loc.split("-")[-1],
|
||||
link=elink,
|
||||
author=author,
|
||||
edition=edition,
|
||||
)
|
||||
else:
|
||||
return Book(
|
||||
title=title,
|
||||
ppn=ppn,
|
||||
signature=signature,
|
||||
library_location=loc.split("\n\n")[-1],
|
||||
link=elink,
|
||||
author=author,
|
||||
edition=edition,
|
||||
)
|
||||
|
||||
def get(self, ppn: str) -> Book | None:
|
||||
# based on PPN, get title, people, edition, year, language, pages, isbn,
|
||||
link = f"https://rds.ibs-bw.de/phfreiburg/opac/RDSIndexrecord/{ppn}"
|
||||
result = self.search(link)
|
||||
soup = BeautifulSoup(result, "html.parser")
|
||||
|
||||
def get_ppn(self, searchterm: str) -> str | None:
|
||||
links = self.get_book_links(searchterm)
|
||||
ppn = None
|
||||
for link in links:
|
||||
result = self.search(link)
|
||||
soup = BeautifulSoup(result, "html.parser")
|
||||
print(link)
|
||||
ppn = link.split("/")[-1]
|
||||
if ppn and regex.match(r"^\d{8,10}[X\d]?$", ppn):
|
||||
return ppn
|
||||
return ppn
|
||||
|
||||
def get_semesterapparat_number(self, searchterm: str) -> int:
|
||||
links = self.get_book_links(searchterm)
|
||||
for link in links:
|
||||
result = self.search(link)
|
||||
# in result search for class col-xs-12 rds-dl RDS_LOCATION
|
||||
# if found, return text of href
|
||||
soup = BeautifulSoup(result, "html.parser")
|
||||
|
||||
locations = soup.find_all("div", class_="col-xs-12 rds-dl RDS_LOCATION")
|
||||
for location_el in locations:
|
||||
if "Semesterapparat-" in location_el.text:
|
||||
match = regex.search(r"Semesterapparat-(\d+)", location_el.text)
|
||||
if match:
|
||||
return int(match.group(1))
|
||||
if "Handbibliothek-" in location_el.text:
|
||||
return location_el.text.strip().split("\n\n")[-1].strip()
|
||||
return location_el.text.strip().split("\n\n")[-1].strip()
|
||||
return 0
|
||||
|
||||
def get_author(self, link: str) -> str:
|
||||
links = self.get_book_links(f"kid:{link}")
|
||||
author = None
|
||||
for link in links:
|
||||
# print(link)
|
||||
result = self.search(link)
|
||||
soup = BeautifulSoup(result, "html.parser")
|
||||
# get all authors, return them as a string seperated by ;
|
||||
authors = soup.find_all(
|
||||
"div", class_="col-xs-12 col-md-5 col-lg-4 rds-dl-head RDS_PERSON"
|
||||
)
|
||||
if authors:
|
||||
# get the names of the a href links in the div col-xs-12 col-md-7 col-lg-8 rds-dl-panel
|
||||
author_names = []
|
||||
for author in authors:
|
||||
panel = author.find_next_sibling(
|
||||
"div", class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel"
|
||||
)
|
||||
if panel:
|
||||
links = panel.find_all("a")
|
||||
for link in links:
|
||||
author_names.append(link.text.strip())
|
||||
author = "; ".join(author_names)
|
||||
return author
|
||||
|
||||
def get_signature(self, isbn: str):
|
||||
links = self.get_book_links(f"{isbn}")
|
||||
signature = None
|
||||
for link in links:
|
||||
result = self.search(link)
|
||||
soup = BeautifulSoup(result, "html.parser")
|
||||
panel = soup.select_one("div.panel-body")
|
||||
if panel:
|
||||
# Collect the RDS_* blocks in order, using the 'space' divs as separators
|
||||
groups = []
|
||||
cur = {}
|
||||
for node in panel.select(
|
||||
"div.rds-dl.RDS_SIGNATURE, div.rds-dl.RDS_STATUS, div.rds-dl.RDS_LOCATION, div.col-xs-12.space"
|
||||
):
|
||||
classes = node.get("class", [])
|
||||
# Separator between entries
|
||||
if "space" in classes:
|
||||
if cur:
|
||||
groups.append(cur)
|
||||
cur = {}
|
||||
continue
|
||||
|
||||
# Read the value from the corresponding panel cell
|
||||
val_el = node.select_one(".rds-dl-panel")
|
||||
val = (
|
||||
val_el.get_text(" ", strip=True)
|
||||
if val_el
|
||||
else node.get_text(" ", strip=True)
|
||||
)
|
||||
|
||||
if "RDS_SIGNATURE" in classes:
|
||||
cur["signature"] = val
|
||||
elif "RDS_STATUS" in classes:
|
||||
cur["status"] = val
|
||||
elif "RDS_LOCATION" in classes:
|
||||
cur["location"] = val
|
||||
|
||||
if cur: # append the last group if not followed by a space
|
||||
groups.append(cur)
|
||||
|
||||
# Find the signature for the entry whose location mentions "Semesterapparat"
|
||||
for g in groups:
|
||||
print(g)
|
||||
loc = g.get("location", "").lower()
|
||||
if "semesterapparat" in loc:
|
||||
signature = g.get("signature")
|
||||
return signature
|
||||
else:
|
||||
signature = g.get("signature")
|
||||
return signature
|
||||
print("No signature found")
|
||||
return signature
|
||||
|
||||
def in_library(self, ppn: str) -> bool:
|
||||
if ppn is None:
|
||||
return False
|
||||
links = self.get_book_links(f"kid:{ppn}")
|
||||
return len(links) > 0
|
||||
|
||||
def get_location(self, ppn: str) -> str | None:
|
||||
if ppn is None:
|
||||
return None
|
||||
link = self.get_book(f"{ppn}")
|
||||
if link is None:
|
||||
return None
|
||||
return link.library_location
|
||||
312
src/services/lehmanns.py
Normal file
312
src/services/lehmanns.py
Normal file
@@ -0,0 +1,312 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from typing import Iterable, List, Optional
|
||||
from urllib.parse import quote_plus, urljoin
|
||||
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from src.core.models import BookData
|
||||
|
||||
BASE = "https://www.lehmanns.de"
|
||||
SEARCH_URL = "https://www.lehmanns.de/search/quick?mediatype_id=&q="
|
||||
|
||||
|
||||
@dataclass
|
||||
class LehmannsSearchResult:
|
||||
title: str
|
||||
url: str
|
||||
|
||||
# Core fields from the listing card
|
||||
year: Optional[int] = None
|
||||
edition: Optional[int] = None
|
||||
publisher: Optional[str] = None
|
||||
isbn13: Optional[str] = None
|
||||
|
||||
# Extras from the listing card
|
||||
description: Optional[str] = None
|
||||
authors: list[str] = field(default_factory=list)
|
||||
media_type: Optional[str] = None
|
||||
book_format: Optional[str] = None
|
||||
price_eur: Optional[float] = None
|
||||
currency: str = "EUR"
|
||||
image: Optional[str] = None
|
||||
|
||||
# From detail page:
|
||||
pages: Optional[str] = None # "<N> Seiten"
|
||||
buyable: bool = True # set in enrich_pages (detail page)
|
||||
unavailable_hint: Optional[str] = (
|
||||
None # e.g. "Titel ist leider vergriffen; keine Neuauflage"
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class LehmannsClient:
|
||||
"""Scrapes quick-search results, then enriches (and filters) via product pages."""
|
||||
|
||||
def __init__(self, timeout: float = 20.0):
|
||||
self.client = httpx.Client(
|
||||
headers={
|
||||
"User-Agent": (
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/124.0 Safari/537.36"
|
||||
),
|
||||
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
},
|
||||
timeout=timeout,
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
def close(self):
|
||||
self.client.close()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc):
|
||||
self.close()
|
||||
|
||||
# ------------------- Search (listing) -------------------
|
||||
|
||||
def build_search_url(self, title: str) -> str:
|
||||
# spaces -> '+'
|
||||
return SEARCH_URL + quote_plus(title)
|
||||
|
||||
def search_by_title(
|
||||
self,
|
||||
title: str,
|
||||
limit: Optional[int] = None,
|
||||
strict: bool = False,
|
||||
only_latest: bool = True,
|
||||
) -> List[BookData]:
|
||||
"""
|
||||
Parse the listing page only (no availability check here).
|
||||
Use enrich_pages(...) afterwards to fetch detail pages, add 'pages',
|
||||
and drop unbuyable items.
|
||||
"""
|
||||
url = self.build_search_url(title=title)
|
||||
html = self._get(url)
|
||||
if not html:
|
||||
return []
|
||||
results = self._parse_results(html)
|
||||
self.enrich_pages(results)
|
||||
|
||||
results = [BookData().from_LehmannsSearchResult(r) for r in results]
|
||||
if strict:
|
||||
# filter results to only those with exact title match (case-insensitive)
|
||||
title_lower = title.lower()
|
||||
results = [r for r in results if r.title and r.title.lower() == title_lower]
|
||||
# results = [r for r in results if r.buyable]
|
||||
return results
|
||||
if limit is not None:
|
||||
results = results[: max(0, limit)]
|
||||
if only_latest and len(results) > 1:
|
||||
# keep only the latest edition (highest edition number)
|
||||
results.sort(key=lambda r: (r.edition_number or 0), reverse=True)
|
||||
results = [results[0]]
|
||||
return results
|
||||
|
||||
# ------------------- Detail enrichment & filtering -------------------
|
||||
|
||||
def enrich_pages(
|
||||
self, results: Iterable[LehmannsSearchResult], drop_unbuyable: bool = True
|
||||
) -> List[LehmannsSearchResult]:
|
||||
"""
|
||||
Fetch each result.url, extract:
|
||||
- pages: from <span class="book-meta meta-seiten" itemprop="numberOfPages">...</span>
|
||||
- availability: from <li class="availability-3">...</li>
|
||||
* if it contains "Titel ist leider vergriffen", mark buyable=False
|
||||
* if it also contains "keine Neuauflage", set unavailable_hint accordingly
|
||||
If drop_unbuyable=True, exclude non-buyable results from the returned list.
|
||||
"""
|
||||
enriched: List[LehmannsSearchResult] = []
|
||||
for r in results:
|
||||
try:
|
||||
html = self._get(r.url)
|
||||
if not html:
|
||||
# Can't verify; keep as-is when not dropping, else skip
|
||||
if not drop_unbuyable:
|
||||
enriched.append(r)
|
||||
continue
|
||||
|
||||
soup = BeautifulSoup(html, "html.parser") # type: ignore
|
||||
|
||||
# Pages
|
||||
pages_node = soup.select_one( # type: ignore
|
||||
"span.book-meta.meta-seiten[itemprop='numberOfPages'], "
|
||||
"span.book-meta.meta-seiten[itemprop='numberofpages'], "
|
||||
".meta-seiten [itemprop='numberOfPages'], "
|
||||
".meta-seiten[itemprop='numberOfPages'], "
|
||||
".book-meta.meta-seiten"
|
||||
)
|
||||
if pages_node:
|
||||
text = pages_node.get_text(" ", strip=True)
|
||||
m = re.search(r"\d+", text)
|
||||
if m:
|
||||
r.pages = f"{m.group(0)} Seiten"
|
||||
|
||||
# Availability via li.availability-3
|
||||
avail_li = soup.select_one("li.availability-3") # type: ignore
|
||||
if avail_li:
|
||||
avail_text = " ".join(
|
||||
avail_li.get_text(" ", strip=True).split()
|
||||
).lower()
|
||||
if "titel ist leider vergriffen" in avail_text:
|
||||
r.buyable = False
|
||||
if "keine neuauflage" in avail_text:
|
||||
r.unavailable_hint = (
|
||||
"Titel ist leider vergriffen; keine Neuauflage"
|
||||
)
|
||||
else:
|
||||
r.unavailable_hint = "Titel ist leider vergriffen"
|
||||
|
||||
# Append or drop
|
||||
if (not drop_unbuyable) or r.buyable:
|
||||
enriched.append(r)
|
||||
|
||||
except Exception:
|
||||
# On any per-item error, keep the record if not dropping; else skip
|
||||
if not drop_unbuyable:
|
||||
enriched.append(r)
|
||||
continue
|
||||
|
||||
return enriched
|
||||
|
||||
# ------------------- Internals -------------------
|
||||
|
||||
def _get(self, url: str) -> Optional[str]:
|
||||
try:
|
||||
r = self.client.get(url)
|
||||
r.encoding = "utf-8"
|
||||
if r.status_code == 200 and "text/html" in (
|
||||
r.headers.get("content-type") or ""
|
||||
):
|
||||
return r.text
|
||||
except httpx.HTTPError:
|
||||
pass
|
||||
return None
|
||||
|
||||
def _parse_results(self, html: str) -> List[LehmannsSearchResult]:
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
results: list[LehmannsSearchResult] = []
|
||||
|
||||
for block in soup.select("div.info-block"):
|
||||
a = block.select_one(".title a[href]")
|
||||
if not a:
|
||||
continue
|
||||
url = urljoin(BASE, a["href"].strip())
|
||||
base_title = (block.select_one(".title [itemprop='name']") or a).get_text( # type: ignore
|
||||
strip=True
|
||||
)
|
||||
|
||||
# Alternative headline => extend title
|
||||
alt_tag = block.select_one(".description[itemprop='alternativeHeadline']") # type: ignore
|
||||
alternative_headline = alt_tag.get_text(strip=True) if alt_tag else None
|
||||
title = (
|
||||
f"{base_title} : {alternative_headline}"
|
||||
if alternative_headline
|
||||
else base_title
|
||||
)
|
||||
description = alternative_headline
|
||||
|
||||
# Authors from .author
|
||||
authors: list[str] = []
|
||||
author_div = block.select_one("div.author") # type: ignore
|
||||
if author_div:
|
||||
t = author_div.get_text(" ", strip=True)
|
||||
t = re.sub(r"^\s*von\s+", "", t, flags=re.I)
|
||||
for part in re.split(r"\s*;\s*|\s*&\s*|\s+und\s+", t):
|
||||
name = " ".join(part.split())
|
||||
if name:
|
||||
authors.append(name)
|
||||
|
||||
# Media + format
|
||||
media_type = None
|
||||
book_format = None
|
||||
type_text = block.select_one(".type") # type: ignore
|
||||
if type_text:
|
||||
t = type_text.get_text(" ", strip=True)
|
||||
m = re.search(r"\b(Buch|eBook|Hörbuch)\b", t)
|
||||
if m:
|
||||
media_type = m.group(1)
|
||||
fm = re.search(r"\(([^)]+)\)", t)
|
||||
if fm:
|
||||
book_format = fm.group(1).strip().upper()
|
||||
|
||||
# Year
|
||||
year = None
|
||||
y = block.select_one("[itemprop='copyrightYear']") # type: ignore
|
||||
if y:
|
||||
try:
|
||||
year = int(y.get_text(strip=True))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Edition
|
||||
edition = None
|
||||
ed = block.select_one("[itemprop='bookEdition']") # type: ignore
|
||||
if ed:
|
||||
m = re.search(r"\d+", ed.get_text(strip=True))
|
||||
if m:
|
||||
edition = int(m.group())
|
||||
|
||||
# Publisher
|
||||
publisher = None
|
||||
pub = block.select_one( # type: ignore
|
||||
".publisherprop [itemprop='name']"
|
||||
) or block.select_one(".publisher [itemprop='name']") # type: ignore
|
||||
if pub:
|
||||
publisher = pub.get_text(strip=True)
|
||||
|
||||
# ISBN-13
|
||||
isbn13 = None
|
||||
isbn_tag = block.select_one(".isbn [itemprop='isbn'], [itemprop='isbn']") # type: ignore
|
||||
if isbn_tag:
|
||||
digits = re.sub(r"[^0-9Xx]", "", isbn_tag.get_text(strip=True))
|
||||
m = re.search(r"(97[89]\d{10})", digits)
|
||||
if m:
|
||||
isbn13 = m.group(1)
|
||||
|
||||
# Price (best effort)
|
||||
price_eur = None
|
||||
txt = block.get_text(" ", strip=True)
|
||||
mprice = re.search(r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€", txt)
|
||||
if not mprice and block.parent:
|
||||
sib = block.parent.get_text(" ", strip=True)
|
||||
mprice = re.search(r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€", sib)
|
||||
if mprice:
|
||||
num = mprice.group(1).replace(".", "").replace(",", ".")
|
||||
try:
|
||||
price_eur = float(num)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Image (best-effort)
|
||||
image = None
|
||||
left_img = block.find_previous("img") # type: ignore
|
||||
if left_img and left_img.get("src"):
|
||||
image = urljoin(BASE, left_img["src"])
|
||||
|
||||
results.append(
|
||||
LehmannsSearchResult(
|
||||
title=title,
|
||||
url=url,
|
||||
description=description,
|
||||
authors=authors,
|
||||
media_type=media_type,
|
||||
book_format=book_format,
|
||||
year=year,
|
||||
edition=edition,
|
||||
publisher=publisher,
|
||||
isbn13=isbn13,
|
||||
price_eur=price_eur,
|
||||
image=image,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
58
src/services/openai.py
Normal file
58
src/services/openai.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from openai import OpenAI
|
||||
|
||||
from src import settings
|
||||
|
||||
|
||||
def init_client() -> OpenAI:
|
||||
"""Initialize the OpenAI client with the API key and model from settings."""
|
||||
global client, model, api_key
|
||||
if not settings.openAI.api_key:
|
||||
raise ValueError("OpenAI API key is not set in the configuration.")
|
||||
if not settings.openAI.model:
|
||||
raise ValueError("OpenAI model is not set in the configuration.")
|
||||
|
||||
model = settings.openAI.model
|
||||
api_key = settings.openAI.api_key
|
||||
client = OpenAI(api_key=api_key)
|
||||
return client
|
||||
|
||||
|
||||
def run_shortener(title: str, length: int) -> list[dict[str, Any]]:
|
||||
client = init_client()
|
||||
response = client.responses.create( # type: ignore
|
||||
model=model,
|
||||
instructions="""you are a sentence shortener. The next message will contain the string to shorten and the length limit.
|
||||
You need to shorten the string to be under the length limit, while keeping as much detail as possible. The result may NOT be longer than the length limit.
|
||||
based on that, please reply only the shortened string. Give me 5 choices. if the length is too long, discard the string and try another one.Return the data as a python list containing the result as {"shortened_string": shortened_string, "length": lengthasInt}. Do not return the answer in a codeblock, use a pure string. Before answering, check the results and if ANY is longer than the needed_length, discard all and try again""",
|
||||
input=f'{{"string":"{title}", "needed_length":{length}}}',
|
||||
)
|
||||
answers = response.output_text
|
||||
return eval(answers) # type: ignore
|
||||
# answers are strings in json format, so we need to convert them to a list of dicts
|
||||
|
||||
|
||||
def name_tester(name: str) -> dict:
|
||||
client = init_client()
|
||||
response = client.responses.create( # type: ignore
|
||||
model=model,
|
||||
instructions="""you are a name tester, You are given a name and will have to split the name into first name, last name, and if present the title. Return the name in a json format with the keys "title", "first_name", "last_name". If no title is present, set title to none. Do NOt return the answer in a codeblock, use a pure json string. Assume the names are in the usual german naming scheme""",
|
||||
input=f'{{"name":"{name}"}}',
|
||||
)
|
||||
answers = response.output_text
|
||||
|
||||
return json.loads(answers)
|
||||
|
||||
|
||||
def semester_converter(semester: str) -> str:
|
||||
client = init_client()
|
||||
response = client.responses.create( # type: ignore
|
||||
model=model,
|
||||
instructions="""you are a semester converter. You will be given a string. Convert this into a string like this: SoSe YY or WiSe YY/YY+1. Do not return the answer in a codeblock, use a pure string.""",
|
||||
input=semester,
|
||||
)
|
||||
answers = response.output_text
|
||||
|
||||
return answers
|
||||
631
src/services/sru.py
Normal file
631
src/services/sru.py
Normal file
@@ -0,0 +1,631 @@
|
||||
import re
|
||||
import xml.etree.ElementTree as ET
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
|
||||
# centralized logging used via src.shared.logging
|
||||
from src.core.models import BookData
|
||||
from src.shared.logging import log
|
||||
|
||||
log # ensure imported logger is referenced
|
||||
|
||||
|
||||
# -----------------------
|
||||
# Dataclasses
|
||||
# -----------------------
|
||||
|
||||
|
||||
# --- MARC XML structures ---
|
||||
@dataclass
|
||||
class ControlField:
|
||||
tag: str
|
||||
value: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubField:
|
||||
code: str
|
||||
value: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class DataField:
|
||||
tag: str
|
||||
ind1: str = " "
|
||||
ind2: str = " "
|
||||
subfields: List[SubField] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MarcRecord:
|
||||
leader: str
|
||||
controlfields: List[ControlField] = field(default_factory=list)
|
||||
datafields: List[DataField] = field(default_factory=list)
|
||||
|
||||
|
||||
# --- SRU record wrapper ---
|
||||
@dataclass
|
||||
class Record:
|
||||
recordSchema: str
|
||||
recordPacking: str
|
||||
recordData: MarcRecord
|
||||
recordPosition: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class EchoedSearchRequest:
|
||||
version: str
|
||||
query: str
|
||||
maximumRecords: int
|
||||
recordPacking: str
|
||||
recordSchema: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchRetrieveResponse:
|
||||
version: str
|
||||
numberOfRecords: int
|
||||
records: List[Record] = field(default_factory=list)
|
||||
echoedSearchRetrieveRequest: Optional[EchoedSearchRequest] = None
|
||||
|
||||
|
||||
# -----------------------
|
||||
# Parser
|
||||
# -----------------------
|
||||
|
||||
ZS = "http://www.loc.gov/zing/srw/"
|
||||
MARC = "http://www.loc.gov/MARC21/slim"
|
||||
NS = {"zs": ZS, "marc": MARC}
|
||||
|
||||
|
||||
def _text(elem: Optional[ET.Element]) -> str:
|
||||
return (elem.text or "") if elem is not None else ""
|
||||
|
||||
|
||||
def _req_text(parent: ET.Element, path: str) -> Optional[str]:
|
||||
el = parent.find(path, NS)
|
||||
if el is None or el.text is None:
|
||||
return None
|
||||
return el.text
|
||||
|
||||
|
||||
def parse_marc_record(record_el: ET.Element) -> MarcRecord:
|
||||
"""
|
||||
record_el is the <marc:record> element (default ns MARC in your sample)
|
||||
"""
|
||||
# leader
|
||||
leader_text = _req_text(record_el, "marc:leader") or ""
|
||||
|
||||
# controlfields
|
||||
controlfields: List[ControlField] = []
|
||||
for cf in record_el.findall("marc:controlfield", NS):
|
||||
tag = cf.get("tag", "").strip()
|
||||
controlfields.append(ControlField(tag=tag, value=_text(cf)))
|
||||
|
||||
# datafields
|
||||
datafields: List[DataField] = []
|
||||
for df in record_el.findall("marc:datafield", NS):
|
||||
tag = df.get("tag", "").strip()
|
||||
ind1 = df.get("ind1") or " "
|
||||
ind2 = df.get("ind2") or " "
|
||||
subfields: List[SubField] = []
|
||||
for sf in df.findall("marc:subfield", NS):
|
||||
code = sf.get("code", "")
|
||||
subfields.append(SubField(code=code, value=_text(sf)))
|
||||
datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields))
|
||||
|
||||
return MarcRecord(
|
||||
leader=leader_text, controlfields=controlfields, datafields=datafields
|
||||
)
|
||||
|
||||
|
||||
def parse_record(zs_record_el: ET.Element) -> Record:
|
||||
recordSchema = _req_text(zs_record_el, "zs:recordSchema") or ""
|
||||
recordPacking = _req_text(zs_record_el, "zs:recordPacking") or ""
|
||||
|
||||
# recordData contains a MARC <record> with default MARC namespace in your sample
|
||||
recordData_el = zs_record_el.find("zs:recordData", NS)
|
||||
if recordData_el is None:
|
||||
raise ValueError("Missing zs:recordData")
|
||||
|
||||
marc_record_el = recordData_el.find("marc:record", NS)
|
||||
if marc_record_el is None:
|
||||
# If the MARC record uses default ns (xmlns="...") ElementTree still needs the ns-qualified name
|
||||
# We already searched with prefix; this covers both default and prefixed cases.
|
||||
raise ValueError("Missing MARC21 record inside zs:recordData")
|
||||
|
||||
marc_record = parse_marc_record(marc_record_el)
|
||||
|
||||
recordPosition = int(_req_text(zs_record_el, "zs:recordPosition") or "0")
|
||||
return Record(
|
||||
recordSchema=recordSchema,
|
||||
recordPacking=recordPacking,
|
||||
recordData=marc_record,
|
||||
recordPosition=recordPosition,
|
||||
)
|
||||
|
||||
|
||||
def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]:
|
||||
el = root.find("zs:echoedSearchRetrieveRequest", NS)
|
||||
if el is None:
|
||||
return None
|
||||
|
||||
# Be permissive with missing fields
|
||||
version = _text(el.find("zs:version", NS))
|
||||
query = _text(el.find("zs:query", NS))
|
||||
maximumRecords_text = _text(el.find("zs:maximumRecords", NS)) or "0"
|
||||
recordPacking = _text(el.find("zs:recordPacking", NS))
|
||||
recordSchema = _text(el.find("zs:recordSchema", NS))
|
||||
|
||||
try:
|
||||
maximumRecords = int(maximumRecords_text)
|
||||
except ValueError:
|
||||
maximumRecords = 0
|
||||
|
||||
return EchoedSearchRequest(
|
||||
version=version,
|
||||
query=query,
|
||||
maximumRecords=maximumRecords,
|
||||
recordPacking=recordPacking,
|
||||
recordSchema=recordSchema,
|
||||
)
|
||||
|
||||
|
||||
def parse_search_retrieve_response(
|
||||
xml_str: Union[str, bytes],
|
||||
) -> SearchRetrieveResponse:
|
||||
root = ET.fromstring(xml_str)
|
||||
|
||||
# Root is zs:searchRetrieveResponse
|
||||
version = _req_text(root, "zs:version")
|
||||
numberOfRecords = int(_req_text(root, "zs:numberOfRecords") or "0")
|
||||
|
||||
records_parent = root.find("zs:records", NS)
|
||||
records: List[Record] = []
|
||||
if records_parent is not None:
|
||||
for r in records_parent.findall("zs:record", NS):
|
||||
records.append(parse_record(r))
|
||||
|
||||
echoed = parse_echoed_request(root)
|
||||
|
||||
return SearchRetrieveResponse(
|
||||
version=version,
|
||||
numberOfRecords=numberOfRecords,
|
||||
records=records,
|
||||
echoedSearchRetrieveRequest=echoed,
|
||||
)
|
||||
|
||||
|
||||
# --- Query helpers over MarcRecord ---
|
||||
|
||||
|
||||
def iter_datafields(
|
||||
rec: MarcRecord,
|
||||
tag: Optional[str] = None,
|
||||
ind1: Optional[str] = None,
|
||||
ind2: Optional[str] = None,
|
||||
) -> Iterable[DataField]:
|
||||
"""Yield datafields, optionally filtered by tag/indicators."""
|
||||
for df in rec.datafields:
|
||||
if tag is not None and df.tag != tag:
|
||||
continue
|
||||
if ind1 is not None and df.ind1 != ind1:
|
||||
continue
|
||||
if ind2 is not None and df.ind2 != ind2:
|
||||
continue
|
||||
yield df
|
||||
|
||||
|
||||
def subfield_values(
|
||||
rec: MarcRecord,
|
||||
tag: str,
|
||||
code: str,
|
||||
*,
|
||||
ind1: Optional[str] = None,
|
||||
ind2: Optional[str] = None,
|
||||
) -> List[str]:
|
||||
"""All values for subfield `code` in every `tag` field (respecting indicators)."""
|
||||
out: List[str] = []
|
||||
for df in iter_datafields(rec, tag, ind1, ind2):
|
||||
out.extend(sf.value for sf in df.subfields if sf.code == code)
|
||||
return out
|
||||
|
||||
|
||||
def first_subfield_value(
|
||||
rec: MarcRecord,
|
||||
tag: str,
|
||||
code: str,
|
||||
*,
|
||||
ind1: Optional[str] = None,
|
||||
ind2: Optional[str] = None,
|
||||
default: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
"""First value for subfield `code` in `tag` (respecting indicators)."""
|
||||
for df in iter_datafields(rec, tag, ind1, ind2):
|
||||
for sf in df.subfields:
|
||||
if sf.code == code:
|
||||
return sf.value
|
||||
return default
|
||||
|
||||
|
||||
def find_datafields_with_subfields(
|
||||
rec: MarcRecord,
|
||||
tag: str,
|
||||
*,
|
||||
where_all: Optional[Dict[str, str]] = None,
|
||||
where_any: Optional[Dict[str, str]] = None,
|
||||
casefold: bool = False,
|
||||
ind1: Optional[str] = None,
|
||||
ind2: Optional[str] = None,
|
||||
) -> List[DataField]:
|
||||
"""
|
||||
Return datafields of `tag` whose subfields match constraints:
|
||||
- where_all: every (code -> exact value) must be present
|
||||
- where_any: at least one (code -> exact value) present
|
||||
Set `casefold=True` for case-insensitive comparison.
|
||||
"""
|
||||
where_all = where_all or {}
|
||||
where_any = where_any or {}
|
||||
matched: List[DataField] = []
|
||||
|
||||
for df in iter_datafields(rec, tag, ind1, ind2):
|
||||
# Map code -> list of values (with optional casefold applied)
|
||||
vals: Dict[str, List[str]] = {}
|
||||
for sf in df.subfields:
|
||||
v = sf.value.casefold() if casefold else sf.value
|
||||
vals.setdefault(sf.code, []).append(v)
|
||||
|
||||
ok = True
|
||||
for c, v in where_all.items():
|
||||
vv = v.casefold() if casefold else v
|
||||
if c not in vals or vv not in vals[c]:
|
||||
ok = False
|
||||
break
|
||||
|
||||
if ok and where_any:
|
||||
any_ok = any(
|
||||
(c in vals) and ((v.casefold() if casefold else v) in vals[c])
|
||||
for c, v in where_any.items()
|
||||
)
|
||||
if not any_ok:
|
||||
ok = False
|
||||
|
||||
if ok:
|
||||
matched.append(df)
|
||||
|
||||
return matched
|
||||
|
||||
|
||||
def controlfield_value(
|
||||
rec: MarcRecord, tag: str, default: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""Get the first controlfield value by tag (e.g., '001', '005')."""
|
||||
for cf in rec.controlfields:
|
||||
if cf.tag == tag:
|
||||
return cf.value
|
||||
return default
|
||||
|
||||
|
||||
def datafields_value(
|
||||
data: List[DataField], code: str, default: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""Get the first value for a specific subfield code in a list of datafields."""
|
||||
for df in data:
|
||||
for sf in df.subfields:
|
||||
if sf.code == code:
|
||||
return sf.value
|
||||
return default
|
||||
|
||||
|
||||
def datafield_value(
|
||||
df: DataField, code: str, default: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""Get the first value for a specific subfield code in a datafield."""
|
||||
for sf in df.subfields:
|
||||
if sf.code == code:
|
||||
return sf.value
|
||||
return default
|
||||
|
||||
|
||||
def _smart_join_title(a: str, b: Optional[str]) -> str:
|
||||
"""
|
||||
Join 245 $a and $b with MARC-style punctuation.
|
||||
If $b is present, join with ' : ' unless either side already supplies punctuation.
|
||||
"""
|
||||
a = a.strip()
|
||||
if not b:
|
||||
return a
|
||||
b = b.strip()
|
||||
if a.endswith((":", ";", "/")) or b.startswith((":", ";", "/")):
|
||||
return f"{a} {b}"
|
||||
return f"{a} : {b}"
|
||||
|
||||
|
||||
def subfield_values_from_fields(
|
||||
fields: Iterable[DataField],
|
||||
code: str,
|
||||
) -> List[str]:
|
||||
"""All subfield values with given `code` across a list of DataField."""
|
||||
return [sf.value for df in fields for sf in df.subfields if sf.code == code]
|
||||
|
||||
|
||||
def first_subfield_value_from_fields(
|
||||
fields: Iterable[DataField],
|
||||
code: str,
|
||||
default: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
"""First subfield value with given `code` across a list of DataField."""
|
||||
for df in fields:
|
||||
for sf in df.subfields:
|
||||
if sf.code == code:
|
||||
return sf.value
|
||||
return default
|
||||
|
||||
|
||||
def subfield_value_pairs_from_fields(
|
||||
fields: Iterable[DataField],
|
||||
code: str,
|
||||
) -> List[Tuple[DataField, str]]:
|
||||
"""
|
||||
Return (DataField, value) pairs for all subfields with `code`.
|
||||
Useful if you need to know which field a value came from.
|
||||
"""
|
||||
out: List[Tuple[DataField, str]] = []
|
||||
for df in fields:
|
||||
for sf in df.subfields:
|
||||
if sf.code == code:
|
||||
out.append((df, sf.value))
|
||||
return out
|
||||
|
||||
|
||||
def book_from_marc(rec: MarcRecord) -> BookData:
|
||||
# PPN from controlfield 001
|
||||
ppn = controlfield_value(rec, "001")
|
||||
|
||||
# Title = 245 $a + 245 $b (if present)
|
||||
t_a = first_subfield_value(rec, "245", "a")
|
||||
t_b = first_subfield_value(rec, "245", "b")
|
||||
title = _smart_join_title(t_a, t_b) if t_a else None
|
||||
|
||||
# Signature = 924 where $9 == "Frei 129" → take that field's $g
|
||||
frei_fields = find_datafields_with_subfields(
|
||||
rec, "924", where_all={"9": "Frei 129"}
|
||||
)
|
||||
signature = first_subfield_value_from_fields(frei_fields, "g")
|
||||
|
||||
# Year = 264 $c (prefer ind2="1" publication; fallback to any 264)
|
||||
year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value(
|
||||
rec, "264", "c"
|
||||
)
|
||||
isbn = subfield_values(rec, "020", "a")
|
||||
mediatype = first_subfield_value(rec, "338", "a")
|
||||
lang = subfield_values(rec, "041", "a")
|
||||
authors = subfield_values(rec, "700", "a")
|
||||
author = None
|
||||
if authors:
|
||||
author = "; ".join(authors)
|
||||
|
||||
return BookData(
|
||||
ppn=ppn,
|
||||
title=title,
|
||||
signature=signature,
|
||||
edition=first_subfield_value(rec, "250", "a") or "",
|
||||
year=year,
|
||||
pages=first_subfield_value(rec, "300", "a") or "",
|
||||
publisher=first_subfield_value(rec, "264", "b") or "",
|
||||
isbn=isbn,
|
||||
language=lang,
|
||||
link="",
|
||||
author=author,
|
||||
media_type=mediatype,
|
||||
)
|
||||
|
||||
|
||||
class SWBData(Enum):
|
||||
URL = "https://sru.k10plus.de/opac-de-627!rec=1?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=marcxml"
|
||||
ARGSCHEMA = "pica."
|
||||
NAME = "SWB"
|
||||
|
||||
|
||||
class DNBData(Enum):
|
||||
URL = "https://services.dnb.de/sru/dnb?version=1.1&operation=searchRetrieve&query={}&maximumRecords=100&recordSchema=MARC21-xml"
|
||||
ARGSCHEMA = ""
|
||||
NAME = "DNB"
|
||||
|
||||
|
||||
class SRUSite(Enum):
|
||||
SWB = SWBData
|
||||
DNB = DNBData
|
||||
|
||||
|
||||
RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK notations
|
||||
|
||||
|
||||
def find_newer_edition(
|
||||
swb_result: BookData, dnb_result: List[BookData]
|
||||
) -> Optional[List[BookData]]:
|
||||
"""
|
||||
New edition if:
|
||||
- year > swb.year OR
|
||||
- edition_number > swb.edition_number
|
||||
|
||||
Additional guards & preferences:
|
||||
- If both have signatures and they differ, skip (not the same work).
|
||||
- For duplicates (same ppn): keep the one that has a signature, and
|
||||
prefer a signature that matches swb_result.signature.
|
||||
- If multiple remain: keep the single 'latest' by (year desc,
|
||||
edition_number desc, best-signature-match desc, has-signature desc).
|
||||
"""
|
||||
|
||||
def norm_sig(s: Optional[str]) -> str:
|
||||
if not s:
|
||||
return ""
|
||||
# normalize: lowercase, collapse whitespace, keep alnum + a few separators
|
||||
s = s.lower()
|
||||
s = re.sub(r"\s+", " ", s).strip()
|
||||
# remove obvious noise; adjust if your signature format differs
|
||||
s = re.sub(r"[^a-z0-9\-_/\. ]+", "", s)
|
||||
return s
|
||||
|
||||
def has_sig(b: BookData) -> bool:
|
||||
return bool(getattr(b, "signature", None))
|
||||
|
||||
def sig_matches_swb(b: BookData) -> bool:
|
||||
if not has_sig(b) or not has_sig(swb_result):
|
||||
return False
|
||||
return norm_sig(b.signature) == norm_sig(swb_result.signature)
|
||||
|
||||
def strictly_newer(b: BookData) -> bool:
|
||||
by_year = (
|
||||
b.year is not None
|
||||
and swb_result.year is not None
|
||||
and b.year > swb_result.year
|
||||
)
|
||||
by_edition = (
|
||||
b.edition_number is not None
|
||||
and swb_result.edition_number is not None
|
||||
and b.edition_number > swb_result.edition_number
|
||||
)
|
||||
return by_year or by_edition
|
||||
|
||||
swb_sig_norm = norm_sig(getattr(swb_result, "signature", None))
|
||||
|
||||
# 1) Filter to same-work AND newer
|
||||
candidates: List[BookData] = []
|
||||
for b in dnb_result:
|
||||
# Skip if both signatures exist and don't match (different work)
|
||||
b_sig = getattr(b, "signature", None)
|
||||
if b_sig and swb_result.signature:
|
||||
if norm_sig(b_sig) != swb_sig_norm:
|
||||
continue # not the same work
|
||||
|
||||
# Keep only if newer by rules
|
||||
if strictly_newer(b):
|
||||
candidates.append(b)
|
||||
|
||||
if not candidates:
|
||||
return None
|
||||
|
||||
# 2) Dedupe by PPN, preferring signature (and matching signature if possible)
|
||||
by_ppn: dict[Optional[str], BookData] = {}
|
||||
for b in candidates:
|
||||
key = getattr(b, "ppn", None)
|
||||
prev = by_ppn.get(key)
|
||||
if prev is None:
|
||||
by_ppn[key] = b
|
||||
continue
|
||||
|
||||
# Compute preference score for both
|
||||
def ppn_pref_score(x: BookData) -> tuple[int, int]:
|
||||
# (signature matches swb, has signature)
|
||||
return (1 if sig_matches_swb(x) else 0, 1 if has_sig(x) else 0)
|
||||
|
||||
if ppn_pref_score(b) > ppn_pref_score(prev):
|
||||
by_ppn[key] = b
|
||||
|
||||
deduped = list(by_ppn.values())
|
||||
if not deduped:
|
||||
return None
|
||||
|
||||
# 3) If multiple remain, keep only the latest one.
|
||||
# Order: year desc, edition_number desc, signature-match desc, has-signature desc
|
||||
def sort_key(b: BookData):
|
||||
year = b.year if b.year is not None else -1
|
||||
ed = b.edition_number if b.edition_number is not None else -1
|
||||
sig_match = 1 if sig_matches_swb(b) else 0
|
||||
sig_present = 1 if has_sig(b) else 0
|
||||
return (year, ed, sig_match, sig_present)
|
||||
|
||||
best = max(deduped, key=sort_key)
|
||||
return [best] if best else None
|
||||
|
||||
|
||||
class Api:
|
||||
def __init__(self, site: str, url: str, prefix: str):
|
||||
self.site = site
|
||||
self.url = url
|
||||
self.prefix = prefix
|
||||
# Reuse TCP connections across requests for better performance
|
||||
self._session = requests.Session()
|
||||
# Slightly larger connection pool for concurrent calls
|
||||
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20)
|
||||
self._session.mount("http://", adapter)
|
||||
self._session.mount("https://", adapter)
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self._session.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def __del__(self):
|
||||
# Best-effort cleanup
|
||||
self.close()
|
||||
|
||||
def get(self, query_args: Iterable[str]) -> List[Record]:
|
||||
# if any query_arg ends with =, remove it
|
||||
if self.site == "DNB":
|
||||
args = [arg for arg in query_args if not arg.startswith("pica.")]
|
||||
if args == []:
|
||||
raise ValueError("DNB queries must include at least one search term")
|
||||
query_args = args
|
||||
# query_args = [f"{self.prefix}{arg}" for arg in query_args]
|
||||
query = "+and+".join(query_args)
|
||||
query = query.replace(" ", "%20").replace("&", "%26")
|
||||
# query_args = [arg for arg in query_args if not arg.endswith("=")]
|
||||
# query = "+and+".join(query_args)
|
||||
# query = query.replace(" ", "%20").replace("&", "%26")
|
||||
# insert the query into the url url is
|
||||
url = self.url.format(query)
|
||||
|
||||
log.debug(url)
|
||||
headers = {
|
||||
"User-Agent": f"{self.site} SRU Client, <alexander.kirchner@ph-freiburg.de>",
|
||||
"Accept": "application/xml",
|
||||
"Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3",
|
||||
}
|
||||
# Use persistent session and set timeouts to avoid hanging
|
||||
resp = self._session.get(url, headers=headers, timeout=(3.05, 60))
|
||||
if resp.status_code != 200:
|
||||
raise Exception(f"Error fetching data from SWB: {resp.status_code}")
|
||||
# Parse using raw bytes (original behavior) to preserve encoding edge cases
|
||||
sr = parse_search_retrieve_response(resp.content)
|
||||
return sr.records
|
||||
|
||||
def getBooks(self, query_args: Iterable[str]) -> List[BookData]:
|
||||
records: List[Record] = self.get(query_args)
|
||||
# Avoid printing on hot paths; rely on logger if needed
|
||||
log.debug(f"{self.site} found {len(records)} records for args={query_args}")
|
||||
books: List[BookData] = []
|
||||
# extract title from query_args if present
|
||||
title = None
|
||||
for arg in query_args:
|
||||
if arg.startswith("pica.tit="):
|
||||
title = arg.split("=")[1]
|
||||
break
|
||||
for rec in records:
|
||||
book = book_from_marc(rec.recordData)
|
||||
books.append(book)
|
||||
if title:
|
||||
books = [
|
||||
b
|
||||
for b in books
|
||||
if b.title and b.title.lower().startswith(title.lower())
|
||||
]
|
||||
return books
|
||||
|
||||
def getLinkForBook(self, book: BookData) -> str:
|
||||
# Not implemented: depends on catalog front-end; return empty string for now
|
||||
return ""
|
||||
|
||||
|
||||
class SWB(Api):
|
||||
def __init__(self):
|
||||
self.site = SWBData.NAME.value
|
||||
self.url = SWBData.URL.value
|
||||
self.prefix = SWBData.ARGSCHEMA.value
|
||||
super().__init__(self.site, self.url, self.prefix)
|
||||
35
src/services/webadis.py
Normal file
35
src/services/webadis.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
|
||||
def get_book_medianr(signature: str, semesterapparat_nr: int, auth: tuple) -> str:
|
||||
with sync_playwright() as playwright:
|
||||
browser = playwright.chromium.launch(headless=True)
|
||||
context = browser.new_context()
|
||||
page = context.new_page()
|
||||
page.goto(
|
||||
"https://bsz.ibs-bw.de:22998/aDISWeb/app?service=direct/0/Home/$DirectLink&sp=SDAP42"
|
||||
)
|
||||
page.get_by_role("textbox", name="Benutzer").fill(auth[0])
|
||||
page.get_by_role("textbox", name="Benutzer").press("Tab")
|
||||
page.get_by_role("textbox", name="Kennwort").fill(auth[1])
|
||||
page.get_by_role("textbox", name="Kennwort").press("Enter")
|
||||
page.get_by_role("button", name="Katalog").click()
|
||||
page.get_by_role("textbox", name="Signatur").click()
|
||||
page.get_by_role("textbox", name="Signatur").fill(signature)
|
||||
page.get_by_role("textbox", name="Signatur").press("Enter")
|
||||
book_list = page.locator("iframe").content_frame.get_by_role(
|
||||
"cell", name="Bibliothek der Pädagogischen"
|
||||
)
|
||||
# this will always find one result, we need to split the resulting text based on the entries that start with "* "
|
||||
book_entries = book_list.inner_text().split("\n")
|
||||
books = []
|
||||
for entry in book_entries:
|
||||
if entry.startswith("* "):
|
||||
books.append(entry)
|
||||
for book in books:
|
||||
if f"Semesterapparat: {semesterapparat_nr}" in book:
|
||||
return book.split("* ")[1].split(":")[0]
|
||||
|
||||
# ---------------------
|
||||
context.close()
|
||||
browser.close()
|
||||
314
src/services/webrequest.py
Normal file
314
src/services/webrequest.py
Normal file
@@ -0,0 +1,314 @@
|
||||
from enum import Enum
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# import sleep_and_retry decorator to retry requests
|
||||
from ratelimit import limits, sleep_and_retry
|
||||
|
||||
from src.core.models import BookData
|
||||
from src.shared.logging import log
|
||||
from src.transformers import ARRAYData, BibTeXData, COinSData, RDSData, RISData
|
||||
from src.transformers.transformers import RDS_AVAIL_DATA, RDS_GENERIC_DATA
|
||||
|
||||
# logger.add(sys.stderr, format="{time} {level} {message}", level="INFO")
|
||||
|
||||
|
||||
API_URL = "https://rds.ibs-bw.de/phfreiburg/opac/RDSIndexrecord/{}/"
|
||||
PPN_URL = "https://rds.ibs-bw.de/phfreiburg/opac/RDSIndex/Search?type0%5B%5D=allfields&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=au&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ti&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ct&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=isn&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ta&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=co&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=py&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pp&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pu&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=si&lookfor0%5B%5D={}&join=AND&bool0%5B%5D=AND&type0%5B%5D=zr&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=cc&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND"
|
||||
BASE = "https://rds.ibs-bw.de"
|
||||
#
|
||||
TITLE = "RDS_TITLE"
|
||||
SIGNATURE = "RDS_SIGNATURE"
|
||||
EDITION = "RDS_EDITION"
|
||||
ISBN = "RDS_ISBN"
|
||||
AUTHOR = "RDS_PERSON"
|
||||
|
||||
HEADERS = {
|
||||
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 \
|
||||
(HTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36",
|
||||
"Accept-Language": "en-US, en;q=0.5",
|
||||
}
|
||||
RATE_LIMIT = 20
|
||||
RATE_PERIOD = 30
|
||||
|
||||
|
||||
class TransformerType(Enum):
|
||||
ARRAY = "ARRAY"
|
||||
COinS = "COinS"
|
||||
BibTeX = "BibTeX"
|
||||
RIS = "RIS"
|
||||
RDS = "RDS"
|
||||
|
||||
|
||||
class WebRequest:
|
||||
def __init__(self) -> None:
|
||||
"""Request data from the web, and format it depending on the mode."""
|
||||
self.apparat = None
|
||||
self.use_any = False # use any book that matches the search term
|
||||
self.signature = None
|
||||
self.ppn = None
|
||||
self.data = None
|
||||
self.timeout = 5
|
||||
log.info("Initialized WebRequest")
|
||||
|
||||
@property
|
||||
def use_any_book(self):
|
||||
"""use any book that matches the search term"""
|
||||
self.use_any = True
|
||||
log.info("Using any book")
|
||||
return self
|
||||
|
||||
def set_apparat(self, apparat: int) -> "WebRequest":
|
||||
self.apparat = apparat
|
||||
if int(self.apparat) < 10:
|
||||
self.apparat = f"0{self.apparat}"
|
||||
log.info(f"Set apparat to {self.apparat}")
|
||||
return self
|
||||
|
||||
def get_ppn(self, signature: str) -> "WebRequest":
|
||||
self.signature = signature
|
||||
if "+" in signature:
|
||||
signature = signature.replace("+", "%2B")
|
||||
if "doi.org" in signature:
|
||||
signature = signature.split("/")[-1]
|
||||
self.ppn = signature
|
||||
return self
|
||||
|
||||
@sleep_and_retry
|
||||
@limits(calls=RATE_LIMIT, period=RATE_PERIOD)
|
||||
def search_book(self, searchterm: str) -> str:
|
||||
response = requests.get(PPN_URL.format(searchterm), timeout=self.timeout)
|
||||
return response.text
|
||||
|
||||
@sleep_and_retry
|
||||
@limits(calls=RATE_LIMIT, period=RATE_PERIOD)
|
||||
def search_ppn(self, ppn: str) -> str:
|
||||
response = requests.get(API_URL.format(ppn), timeout=self.timeout)
|
||||
return response.text
|
||||
|
||||
def get_book_links(self, searchterm: str) -> list[str]:
|
||||
response: str = self.search_book(searchterm) # type:ignore
|
||||
soup = BeautifulSoup(response, "html.parser")
|
||||
links = soup.find_all("a", class_="title getFull")
|
||||
res: list[str] = []
|
||||
for link in links:
|
||||
res.append(BASE + link["href"])
|
||||
return res
|
||||
|
||||
@sleep_and_retry
|
||||
@limits(calls=RATE_LIMIT, period=RATE_PERIOD)
|
||||
def search(self, link: str) -> Optional[str]:
|
||||
try:
|
||||
response = requests.get(link, timeout=self.timeout)
|
||||
return response.text
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error(f"Request failed: {e}")
|
||||
return None
|
||||
|
||||
def get_data(self) -> Optional[list[str]]:
|
||||
links = self.get_book_links(self.ppn)
|
||||
log.debug(f"Links: {links}")
|
||||
return_data: list[str] = []
|
||||
for link in links:
|
||||
result: str = self.search(link) # type:ignore
|
||||
# in result search for class col-xs-12 rds-dl RDS_LOCATION
|
||||
# if found, return text of href
|
||||
soup = BeautifulSoup(result, "html.parser")
|
||||
locations = soup.find_all("div", class_="col-xs-12 rds-dl RDS_LOCATION")
|
||||
if locations:
|
||||
for location in locations:
|
||||
if "1. OG Semesterapparat" in location.text:
|
||||
log.success("Found Semesterapparat, adding entry")
|
||||
pre_tag = soup.find_all("pre")
|
||||
return_data = []
|
||||
if pre_tag:
|
||||
for tag in pre_tag:
|
||||
data = tag.text.strip()
|
||||
return_data.append(data)
|
||||
return return_data
|
||||
else:
|
||||
log.error("No <pre> tag found")
|
||||
return return_data
|
||||
else:
|
||||
item_location = location.find(
|
||||
"div", class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel"
|
||||
).text.strip()
|
||||
log.debug(f"Item location: {item_location}")
|
||||
if self.use_any:
|
||||
pre_tag = soup.find_all("pre")
|
||||
if pre_tag:
|
||||
for tag in pre_tag:
|
||||
data = tag.text.strip()
|
||||
return_data.append(data)
|
||||
return return_data
|
||||
else:
|
||||
log.error("No <pre> tag found")
|
||||
raise ValueError("No <pre> tag found")
|
||||
elif f"Semesterapparat-{self.apparat}" in item_location:
|
||||
pre_tag = soup.find_all("pre")
|
||||
return_data = []
|
||||
if pre_tag:
|
||||
for tag in pre_tag:
|
||||
data = tag.text.strip()
|
||||
return_data.append(data)
|
||||
return return_data
|
||||
else:
|
||||
log.error("No <pre> tag found")
|
||||
return return_data
|
||||
else:
|
||||
log.error(
|
||||
f"Signature {self.signature} not found in {item_location}"
|
||||
)
|
||||
# return_data = []
|
||||
|
||||
return return_data
|
||||
|
||||
def get_data_elsa(self) -> Optional[list[str]]:
|
||||
links = self.get_book_links(self.ppn)
|
||||
for link in links:
|
||||
result = self.search(link)
|
||||
# in result search for class col-xs-12 rds-dl RDS_LOCATION
|
||||
# if found, return text of href
|
||||
soup = BeautifulSoup(result, "html.parser")
|
||||
locations = soup.find_all("div", class_="col-xs-12 rds-dl RDS_LOCATION")
|
||||
if locations:
|
||||
for _ in locations:
|
||||
pre_tag = soup.find_all("pre")
|
||||
return_data = []
|
||||
if pre_tag:
|
||||
for tag in pre_tag:
|
||||
data = tag.text.strip()
|
||||
return_data.append(data)
|
||||
return return_data
|
||||
else:
|
||||
log.error("No <pre> tag found")
|
||||
return return_data
|
||||
|
||||
|
||||
class BibTextTransformer:
|
||||
"""Transforms data from the web into a BibText format.
|
||||
Valid Modes are ARRAY, COinS, BibTeX, RIS, RDS
|
||||
Raises:
|
||||
ValueError: Raised if mode is not in valid_modes
|
||||
"""
|
||||
|
||||
valid_modes = [
|
||||
TransformerType.ARRAY,
|
||||
TransformerType.COinS,
|
||||
TransformerType.BibTeX,
|
||||
TransformerType.RIS,
|
||||
TransformerType.RDS,
|
||||
]
|
||||
|
||||
def __init__(self, mode: TransformerType = TransformerType.ARRAY) -> None:
|
||||
self.mode = mode.value
|
||||
self.field = None
|
||||
self.signature = None
|
||||
if mode not in self.valid_modes:
|
||||
log.error(f"Mode {mode} not valid")
|
||||
raise ValueError(f"Mode {mode} not valid")
|
||||
self.data = None
|
||||
# self.bookdata = BookData(**self.data)
|
||||
|
||||
def use_signature(self, signature: str) -> "BibTextTransformer":
|
||||
"""use the exact signature to search for the book"""
|
||||
self.signature = signature
|
||||
return self
|
||||
|
||||
def get_data(self, data: Optional[list[str]] = None) -> "BibTextTransformer":
|
||||
RIS_IDENT = "TY -"
|
||||
ARRAY_IDENT = "[kid]"
|
||||
COinS_IDENT = "ctx_ver"
|
||||
BIBTEX_IDENT = "@book"
|
||||
RDS_IDENT = "RDS ---------------------------------- "
|
||||
|
||||
if data is None:
|
||||
self.data = None
|
||||
return self
|
||||
|
||||
if self.mode == "RIS":
|
||||
for line in data:
|
||||
if RIS_IDENT in line:
|
||||
self.data = line
|
||||
elif self.mode == "ARRAY":
|
||||
for line in data:
|
||||
if ARRAY_IDENT in line:
|
||||
self.data = line
|
||||
elif self.mode == "COinS":
|
||||
for line in data:
|
||||
if COinS_IDENT in line:
|
||||
self.data = line
|
||||
elif self.mode == "BibTeX":
|
||||
for line in data:
|
||||
if BIBTEX_IDENT in line:
|
||||
self.data = line
|
||||
elif self.mode == "RDS":
|
||||
for line in data:
|
||||
if RDS_IDENT in line:
|
||||
self.data = line
|
||||
return self
|
||||
|
||||
def return_data(
|
||||
self, option: Any = None
|
||||
) -> Union[
|
||||
Optional[BookData],
|
||||
Optional[RDS_GENERIC_DATA],
|
||||
Optional[RDS_AVAIL_DATA],
|
||||
None,
|
||||
dict[str, Union[RDS_AVAIL_DATA, RDS_GENERIC_DATA]],
|
||||
]:
|
||||
"""Return Data to caller.
|
||||
|
||||
Args:
|
||||
option (string, optional): Option for RDS as there are two filetypes. Use rds_availability or rds_data. Anything else gives a dict of both responses. Defaults to None.
|
||||
|
||||
Returns:
|
||||
BookData: a dataclass containing data about the book
|
||||
"""
|
||||
if self.data is None:
|
||||
return None
|
||||
match self.mode:
|
||||
case "ARRAY":
|
||||
return ARRAYData(self.signature).transform(self.data)
|
||||
case "COinS":
|
||||
return COinSData().transform(self.data)
|
||||
case "BibTeX":
|
||||
return BibTeXData().transform(self.data)
|
||||
case "RIS":
|
||||
return RISData().transform(self.data)
|
||||
case "RDS":
|
||||
return RDSData().transform(self.data).return_data(option)
|
||||
case _:
|
||||
return None
|
||||
|
||||
# if self.mode == "ARRAY":
|
||||
# return ARRAYData().transform(self.data)
|
||||
# elif self.mode == "COinS":
|
||||
# return COinSData().transform(self.data)
|
||||
# elif self.mode == "BibTeX":
|
||||
# return BibTeXData().transform(self.data)
|
||||
# elif self.mode == "RIS":
|
||||
# return RISData().transform(self.data)
|
||||
# elif self.mode == "RDS":
|
||||
# return RDSData().transform(self.data).return_data(option)
|
||||
|
||||
|
||||
def cover(isbn):
|
||||
test_url = f"https://www.buchhandel.de/cover/{isbn}/{isbn}-cover-m.jpg"
|
||||
# log.debug(test_url)
|
||||
data = requests.get(test_url, stream=True)
|
||||
return data.content
|
||||
|
||||
|
||||
def get_content(soup, css_class):
|
||||
return soup.find("div", class_=css_class).text.strip()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# log.debug("main")
|
||||
link = "CU 8500 K64"
|
||||
data = WebRequest(71).get_ppn(link).get_data()
|
||||
bib = BibTextTransformer("ARRAY").get_data().return_data()
|
||||
log.debug(bib)
|
||||
340
src/services/zotero.py
Normal file
340
src/services/zotero.py
Normal file
@@ -0,0 +1,340 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from pyzotero import zotero
|
||||
|
||||
from src import settings
|
||||
from src.services.webrequest import BibTextTransformer, WebRequest
|
||||
from src.shared.logging import log
|
||||
|
||||
|
||||
@dataclass
|
||||
class Creator:
|
||||
firstName: str = None
|
||||
lastName: str = None
|
||||
creatorType: str = "author"
|
||||
|
||||
def from_dict(self, data: dict) -> None:
|
||||
for key, value in data.items():
|
||||
setattr(self, key, value)
|
||||
|
||||
def from_string(self, data: str) -> "Creator":
|
||||
if "," in data:
|
||||
self.firstName = data.split(",")[1]
|
||||
self.lastName = data.split(",")[0]
|
||||
|
||||
return self
|
||||
|
||||
# set __dict__ object to be used in json
|
||||
|
||||
|
||||
@dataclass
|
||||
class Book:
|
||||
itemType: str = "book"
|
||||
creators: list[Creator] = None
|
||||
tags: list = None
|
||||
collections: list = None
|
||||
relations: dict = None
|
||||
title: str = None
|
||||
abstractNote: str = None
|
||||
series: str = None
|
||||
seriesNumber: str = None
|
||||
volume: str = None
|
||||
numberOfVolumes: str = None
|
||||
edition: str = None
|
||||
place: str = None
|
||||
publisher: str = None
|
||||
date: str = None
|
||||
numPages: str = None
|
||||
language: str = None
|
||||
ISBN: str = None
|
||||
shortTitle: str = None
|
||||
url: str = None
|
||||
accessDate: str = None
|
||||
archive: str = None
|
||||
archiveLocation: str = None
|
||||
libraryCatalog: str = None
|
||||
callNumber: str = None
|
||||
rights: str = None
|
||||
extra: str = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
ret = {}
|
||||
for key, value in self.__dict__.items():
|
||||
if value:
|
||||
ret[key] = value
|
||||
return ret
|
||||
|
||||
|
||||
@dataclass
|
||||
class BookSection:
|
||||
itemType: str = "bookSection"
|
||||
title: str = None
|
||||
creators: list[Creator] = None
|
||||
abstractNote: str = None
|
||||
bookTitle: str = None
|
||||
series: str = None
|
||||
seriesNumber: str = None
|
||||
volume: str = None
|
||||
numberOfVolumes: str = None
|
||||
edition: str = None
|
||||
place: str = None
|
||||
publisher: str = None
|
||||
date: str = None
|
||||
pages: str = None
|
||||
language: str = None
|
||||
ISBN: str = None
|
||||
shortTitle: str = None
|
||||
url: str = None
|
||||
accessDate: str = None
|
||||
archive: str = None
|
||||
archiveLocation: str = None
|
||||
libraryCatalog: str = None
|
||||
callNumber: str = None
|
||||
rights: str = None
|
||||
extra: str = None
|
||||
tags = list
|
||||
collections = list
|
||||
relations = dict
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
ret = {}
|
||||
for key, value in self.__dict__.items():
|
||||
if value:
|
||||
ret[key] = value
|
||||
return ret
|
||||
|
||||
def assign(self, book) -> None:
|
||||
for key, value in book.__dict__.items():
|
||||
if key in self.__dict__.keys():
|
||||
try:
|
||||
setattr(self, key, value)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class JournalArticle:
|
||||
itemType = "journalArticle"
|
||||
title: str = None
|
||||
creators: list[Creator] = None
|
||||
abstractNote: str = None
|
||||
publicationTitle: str = None
|
||||
volume: str = None
|
||||
issue: str = None
|
||||
pages: str = None
|
||||
date: str = None
|
||||
series: str = None
|
||||
seriesTitle: str = None
|
||||
seriesText: str = None
|
||||
journalAbbreviation: str = None
|
||||
language: str = None
|
||||
DOI: str = None
|
||||
ISSN: str = None
|
||||
shortTitle: str = None
|
||||
url: str = None
|
||||
accessDate: str = None
|
||||
archive: str = None
|
||||
archiveLocation: str = None
|
||||
libraryCatalog: str = None
|
||||
callNumber: str = None
|
||||
rights: str = None
|
||||
extra: str = None
|
||||
tags = list
|
||||
collections = list
|
||||
relations = dict
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
ret = {}
|
||||
for key, value in self.__dict__.items():
|
||||
if value:
|
||||
ret[key] = value
|
||||
return ret
|
||||
|
||||
def assign(self, book: dict) -> None:
|
||||
for key, value in book.__dict__.items():
|
||||
if key in self.__dict__.keys():
|
||||
try:
|
||||
setattr(self, key, value)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
class ZoteroController:
|
||||
zoterocfg = settings.zotero
|
||||
|
||||
def __init__(self):
|
||||
if self.zoterocfg.library_id is None:
|
||||
return
|
||||
self.zot = zotero.Zotero( # type: ignore
|
||||
self.zoterocfg.library_id,
|
||||
self.zoterocfg.library_type,
|
||||
self.zoterocfg.api_key,
|
||||
)
|
||||
|
||||
def get_books(self) -> list:
|
||||
ret = []
|
||||
items = self.zot.top() # type: ignore
|
||||
for item in items:
|
||||
if item["data"]["itemType"] == "book":
|
||||
ret.append(item)
|
||||
return ret
|
||||
|
||||
# create item in zotero
|
||||
# item is a part of a book
|
||||
def __get_data(self, isbn) -> dict:
|
||||
web = WebRequest()
|
||||
web.get_ppn(isbn)
|
||||
data = web.get_data_elsa()
|
||||
bib = BibTextTransformer()
|
||||
bib.get_data(data)
|
||||
book = bib.return_data()
|
||||
return book
|
||||
|
||||
# # #print(zot.item_template("bookSection"))
|
||||
def createBook(self, isbn) -> Book:
|
||||
book = self.__get_data(isbn)
|
||||
|
||||
bookdata = Book()
|
||||
bookdata.title = book.title.split(":")[0]
|
||||
bookdata.ISBN = book.isbn
|
||||
bookdata.language = book.language
|
||||
bookdata.date = book.year
|
||||
bookdata.publisher = book.publisher
|
||||
bookdata.url = book.link
|
||||
bookdata.edition = book.edition
|
||||
bookdata.place = book.place
|
||||
bookdata.numPages = book.pages
|
||||
authors = [
|
||||
Creator().from_string(author).__dict__ for author in book.author.split(";")
|
||||
]
|
||||
authors = [author for author in authors if author["lastName"] is not None]
|
||||
bookdata.creators = authors
|
||||
return bookdata
|
||||
|
||||
def createItem(self, item) -> Optional[str]:
|
||||
resp = self.zot.create_items([item]) # type: ignore
|
||||
if "successful" in resp.keys():
|
||||
log.debug(resp)
|
||||
return resp["successful"]["0"]["key"]
|
||||
else:
|
||||
return None
|
||||
|
||||
def deleteItem(self, key) -> None:
|
||||
items = self.zot.items()
|
||||
for item in items:
|
||||
if item["key"] == key:
|
||||
self.zot.delete_item(item) # type: ignore
|
||||
# #print(item)
|
||||
break
|
||||
|
||||
def createHGSection(self, book: Book, data: dict) -> Optional[str]:
|
||||
log.debug(book)
|
||||
chapter = BookSection()
|
||||
chapter.assign(book)
|
||||
chapter.pages = data["pages"]
|
||||
chapter.itemType = "bookSection"
|
||||
chapter.ISBN = ""
|
||||
chapter.url = ""
|
||||
chapter.title = data["chapter_title"]
|
||||
creators = chapter.creators
|
||||
for creator in creators:
|
||||
creator["creatorType"] = "editor"
|
||||
chapter.creators = creators
|
||||
authors = [
|
||||
Creator().from_string(author).__dict__
|
||||
for author in data["section_author"].split(";")
|
||||
]
|
||||
chapter.creators += authors
|
||||
|
||||
log.debug(chapter.to_dict())
|
||||
return self.createItem(chapter.to_dict())
|
||||
pass
|
||||
|
||||
def createBookSection(self, book: Book, data: dict) -> Optional[str]:
|
||||
chapter = BookSection()
|
||||
chapter.assign(book)
|
||||
chapter.pages = data["pages"]
|
||||
chapter.itemType = "bookSection"
|
||||
chapter.ISBN = ""
|
||||
chapter.url = ""
|
||||
chapter.title = ""
|
||||
return self.createItem(chapter.to_dict())
|
||||
# chapter.creators
|
||||
|
||||
def createJournalArticle(self, journal, article) -> Optional[str]:
|
||||
# #print(type(article))
|
||||
journalarticle = JournalArticle()
|
||||
journalarticle.assign(journal)
|
||||
journalarticle.itemType = "journalArticle"
|
||||
journalarticle.creators = [
|
||||
Creator().from_string(author).__dict__
|
||||
for author in article["section_author"].split(";")
|
||||
]
|
||||
journalarticle.date = article["year"]
|
||||
journalarticle.title = article["chapter_title"]
|
||||
journalarticle.publicationTitle = article["work_title"].split(":")[0].strip()
|
||||
journalarticle.pages = article["pages"]
|
||||
journalarticle.ISSN = article["isbn"]
|
||||
journalarticle.issue = article["issue"]
|
||||
journalarticle.url = article["isbn"]
|
||||
|
||||
# #print(journalarticle.to_dict())
|
||||
|
||||
return self.createItem(journalarticle.to_dict())
|
||||
|
||||
def get_citation(self, item) -> str:
|
||||
title = self.zot.item( # type: ignore
|
||||
item,
|
||||
content="bib",
|
||||
style="deutsche-gesellschaft-fur-psychologie",
|
||||
)[0]
|
||||
# title = title[0]
|
||||
title = (
|
||||
title.replace("<i>", "")
|
||||
.replace("</i>", "")
|
||||
.replace('<div class="csl-entry">', "")
|
||||
.replace("</div>", "")
|
||||
.replace("&", "&")
|
||||
)
|
||||
return title
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
zot = ZoteroController()
|
||||
book = zot.createBook("DV 3000 D649 (4)")
|
||||
row = "Döbert, Hans & Hörner, Wolfgang & Kopp, Bortho von & Reuter, Lutz R."
|
||||
zot.createBookSection()
|
||||
|
||||
# book = Book()
|
||||
# # # book.
|
||||
# ISBN = "9783801718718"
|
||||
# book = createBook(isbn=ISBN)
|
||||
# chapter = BookSection()
|
||||
# chapter.title = "Geistige Behinderung"
|
||||
# chapter.bookTitle = book.title
|
||||
# chapter.pages = "511 - 538"
|
||||
# chapter.publisher = book.publisher
|
||||
# authors = [
|
||||
# Creator("Jennifer M.", "Phillips").__dict__,
|
||||
# Creator("Hower", "Kwon").__dict__,
|
||||
# Creator("Carl", "Feinstein").__dict__,
|
||||
# Creator("Inco", "Spintczok von Brisinski").__dict__,
|
||||
# ]
|
||||
# publishers = book.author
|
||||
# if isinstance(publishers, str):
|
||||
# publishers = [publishers]
|
||||
# for publisher in publishers:
|
||||
# # #print(publisher)
|
||||
# creator = Creator().from_string(publisher)
|
||||
# creator.creatorType = "editor"
|
||||
# authors.append(creator.__dict__)
|
||||
|
||||
# chapter.creators = authors
|
||||
# chapter.publisher = book.publisher
|
||||
# # #print(chapter.to_dict())
|
||||
# createBookSection(chapter.to_dict())
|
||||
# get_citation("9ZXH8DDE")
|
||||
# # # #print()
|
||||
# # #print(get_books())
|
||||
# # #print(zot.item_creator_types("bookSection"))
|
||||
Reference in New Issue
Block a user