from __future__ import annotations import re from dataclasses import asdict, dataclass, field from typing import Iterable, List, Optional from urllib.parse import quote_plus, urljoin import httpx from bs4 import BeautifulSoup from src.logic.dataclass import BookData BASE = "https://www.lehmanns.de" SEARCH_URL = "https://www.lehmanns.de/search/quick?mediatype_id=&q=" @dataclass class LehmannsSearchResult: title: str url: str # Core fields from the listing card year: Optional[int] = None edition: Optional[int] = None publisher: Optional[str] = None isbn13: Optional[str] = None # Extras from the listing card description: Optional[str] = None authors: list[str] = field(default_factory=list) media_type: Optional[str] = None book_format: Optional[str] = None price_eur: Optional[float] = None currency: str = "EUR" image: Optional[str] = None # From detail page: pages: Optional[str] = None # " Seiten" buyable: bool = True # set in enrich_pages (detail page) unavailable_hint: Optional[str] = ( None # e.g. "Titel ist leider vergriffen; keine Neuauflage" ) def to_dict(self) -> dict: return asdict(self) class LehmannsClient: """Scrapes quick-search results, then enriches (and filters) via product pages.""" def __init__(self, timeout: float = 20.0): self.client = httpx.Client( headers={ "User-Agent": ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/124.0 Safari/537.36" ), "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", }, timeout=timeout, follow_redirects=True, ) def close(self): self.client.close() def __enter__(self): return self def __exit__(self, *exc): self.close() # ------------------- Search (listing) ------------------- def build_search_url(self, title: str) -> str: # spaces -> '+' return SEARCH_URL + quote_plus(title) def search_by_title( self, title: str, limit: Optional[int] = None, strict: bool = False, only_latest: bool = True, ) -> List[BookData]: """ Parse the listing page only (no availability check here). Use enrich_pages(...) afterwards to fetch detail pages, add 'pages', and drop unbuyable items. """ url = self.build_search_url(title=title) html = self._get(url) if not html: return [] results = self._parse_results(html) self.enrich_pages(results) results = [BookData().from_LehmannsSearchResult(r) for r in results] if strict: # filter results to only those with exact title match (case-insensitive) title_lower = title.lower() results = [r for r in results if r.title and r.title.lower() == title_lower] # results = [r for r in results if r.buyable] return results if limit is not None: results = results[: max(0, limit)] if only_latest and len(results) > 1: # keep only the latest edition (highest edition number) results.sort(key=lambda r: (r.edition_number or 0), reverse=True) results = [results[0]] return results # ------------------- Detail enrichment & filtering ------------------- def enrich_pages( self, results: Iterable[LehmannsSearchResult], drop_unbuyable: bool = True ) -> List[LehmannsSearchResult]: """ Fetch each result.url, extract: - pages: from ... - availability: from
  • ...
  • * if it contains "Titel ist leider vergriffen", mark buyable=False * if it also contains "keine Neuauflage", set unavailable_hint accordingly If drop_unbuyable=True, exclude non-buyable results from the returned list. """ enriched: List[LehmannsSearchResult] = [] for r in results: try: html = self._get(r.url) if not html: # Can't verify; keep as-is when not dropping, else skip if not drop_unbuyable: enriched.append(r) continue soup = BeautifulSoup(html, "html.parser") # type: ignore # Pages pages_node = soup.select_one( # type: ignore "span.book-meta.meta-seiten[itemprop='numberOfPages'], " "span.book-meta.meta-seiten[itemprop='numberofpages'], " ".meta-seiten [itemprop='numberOfPages'], " ".meta-seiten[itemprop='numberOfPages'], " ".book-meta.meta-seiten" ) if pages_node: text = pages_node.get_text(" ", strip=True) m = re.search(r"\d+", text) if m: r.pages = f"{m.group(0)} Seiten" # Availability via li.availability-3 avail_li = soup.select_one("li.availability-3") # type: ignore if avail_li: avail_text = " ".join( avail_li.get_text(" ", strip=True).split() ).lower() if "titel ist leider vergriffen" in avail_text: r.buyable = False if "keine neuauflage" in avail_text: r.unavailable_hint = ( "Titel ist leider vergriffen; keine Neuauflage" ) else: r.unavailable_hint = "Titel ist leider vergriffen" # Append or drop if (not drop_unbuyable) or r.buyable: enriched.append(r) except Exception: # On any per-item error, keep the record if not dropping; else skip if not drop_unbuyable: enriched.append(r) continue return enriched # ------------------- Internals ------------------- def _get(self, url: str) -> Optional[str]: try: r = self.client.get(url) r.encoding = "utf-8" if r.status_code == 200 and "text/html" in ( r.headers.get("content-type") or "" ): return r.text except httpx.HTTPError: pass return None def _parse_results(self, html: str) -> List[LehmannsSearchResult]: soup = BeautifulSoup(html, "html.parser") results: list[LehmannsSearchResult] = [] for block in soup.select("div.info-block"): a = block.select_one(".title a[href]") if not a: continue url = urljoin(BASE, a["href"].strip()) base_title = (block.select_one(".title [itemprop='name']") or a).get_text( # type: ignore strip=True ) # Alternative headline => extend title alt_tag = block.select_one(".description[itemprop='alternativeHeadline']") # type: ignore alternative_headline = alt_tag.get_text(strip=True) if alt_tag else None title = ( f"{base_title} : {alternative_headline}" if alternative_headline else base_title ) description = alternative_headline # Authors from .author authors: list[str] = [] author_div = block.select_one("div.author") # type: ignore if author_div: t = author_div.get_text(" ", strip=True) t = re.sub(r"^\s*von\s+", "", t, flags=re.I) for part in re.split(r"\s*;\s*|\s*&\s*|\s+und\s+", t): name = " ".join(part.split()) if name: authors.append(name) # Media + format media_type = None book_format = None type_text = block.select_one(".type") # type: ignore if type_text: t = type_text.get_text(" ", strip=True) m = re.search(r"\b(Buch|eBook|Hörbuch)\b", t) if m: media_type = m.group(1) fm = re.search(r"\(([^)]+)\)", t) if fm: book_format = fm.group(1).strip().upper() # Year year = None y = block.select_one("[itemprop='copyrightYear']") # type: ignore if y: try: year = int(y.get_text(strip=True)) except ValueError: pass # Edition edition = None ed = block.select_one("[itemprop='bookEdition']") # type: ignore if ed: m = re.search(r"\d+", ed.get_text(strip=True)) if m: edition = int(m.group()) # Publisher publisher = None pub = block.select_one( # type: ignore ".publisherprop [itemprop='name']" ) or block.select_one(".publisher [itemprop='name']") # type: ignore if pub: publisher = pub.get_text(strip=True) # ISBN-13 isbn13 = None isbn_tag = block.select_one(".isbn [itemprop='isbn'], [itemprop='isbn']") # type: ignore if isbn_tag: digits = re.sub(r"[^0-9Xx]", "", isbn_tag.get_text(strip=True)) m = re.search(r"(97[89]\d{10})", digits) if m: isbn13 = m.group(1) # Price (best effort) price_eur = None txt = block.get_text(" ", strip=True) mprice = re.search(r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€", txt) if not mprice and block.parent: sib = block.parent.get_text(" ", strip=True) mprice = re.search(r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€", sib) if mprice: num = mprice.group(1).replace(".", "").replace(",", ".") try: price_eur = float(num) except ValueError: pass # Image (best-effort) image = None left_img = block.find_previous("img") # type: ignore if left_img and left_img.get("src"): image = urljoin(BASE, left_img["src"]) results.append( LehmannsSearchResult( title=title, url=url, description=description, authors=authors, media_type=media_type, book_format=book_format, year=year, edition=edition, publisher=publisher, isbn13=isbn13, price_eur=price_eur, image=image, ) ) return results