diff --git a/.gitea/workflows/release.yml b/.gitea/workflows/release.yml index a48d035..892b841 100644 --- a/.gitea/workflows/release.yml +++ b/.gitea/workflows/release.yml @@ -57,6 +57,3 @@ jobs: body: ${{ env.RELEASE_NOTES }} draft: false prerelease: false - env: - GITHUB_TOKEN: ${{ secrets.TOKEN }} - GITHUB_REPOSITORY: ${{ github.repository }} \ No newline at end of file diff --git a/.version b/.version index 7dff5b8..afaf360 100644 --- a/.version +++ b/.version @@ -1 +1 @@ -0.2.1 \ No newline at end of file +1.0.0 \ No newline at end of file diff --git a/mail_vorlagen/Neuauflagen für Semesterapparat {AppNr} - {AppName}.eml b/mail_vorlagen/Neuauflagen für Semesterapparat {AppNr} - {AppName}.eml new file mode 100644 index 0000000..5f0b3be --- /dev/null +++ b/mail_vorlagen/Neuauflagen für Semesterapparat {AppNr} - {AppName}.eml @@ -0,0 +1,21 @@ +Subject: Vorschläge für Neuauflagen - {Appname} +MIME-Version: 1.0 +Content-Type: text/html; charset="UTF-8" +Content-Transfer-Encoding: 8bit + + + + + + +

{greeting}

+


+

für Ihren Semesterapparat {AppNr} - {Appname} wurden folgende Neuauflagen gefunden:

+


+

{newEditions}

+


+

Sollen wir die alte(n) Auflage(n) aus dem Apparat durch diese austauschen?

+


+

-- 

+

{signature}

+

\ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 2a9433a..470172a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,17 +1,18 @@ [project] name = "semesterapparatsmanager" -version = "0.2.1" +version = "1.0.0" description = "Add your description here" readme = "README.md" -requires-python = ">=3.12" +requires-python = ">=3.13" dependencies = [ "appdirs>=1.4.4", - "beautifulsoup4>=4.12.3", + "beautifulsoup4>=4.13.5", "bump-my-version>=0.29.0", - "chardet>=5.2.0", + "charset-normalizer>=3.4.3", "comtypes>=1.4.9", "darkdetect>=0.8.0", "docx2pdf>=0.1.8", + "httpx>=0.28.1", "loguru>=0.7.3", "mkdocs>=1.6.1", "mkdocs-material>=9.5.49", @@ -35,9 +36,12 @@ dev = [ "icecream>=2.1.4", "nuitka>=2.5.9", ] +swbtest = [ + "alive-progress>=3.3.0", +] [tool.bumpversion] -current_version = "0.2.1" +current_version = "1.0.0" parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)" serialize = ["{major}.{minor}.{patch}"] search = "{current_version}" @@ -61,3 +65,7 @@ post_commit_hooks = [] filename = "src/__init__.py" [[tool.bumpversion.files]] filename = ".version" + +[[tool.uv.index]] +url = "https://git.theprivateserver.de/api/packages/WorldTeacher/pypi/simple/" +default = false diff --git a/src/__init__.py b/src/__init__.py index daeaf7a..1062a81 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.2.1" +__version__ = "1.0.0" __author__ = "Alexander Kirchner" __all__ = ["__version__", "__author__", "Icon", "settings"] @@ -8,25 +8,24 @@ from appdirs import AppDirs from config import Config - app = AppDirs("SemesterApparatsManager", "SAM") -LOG_DIR = app.user_log_dir -CONFIG_DIR = app.user_config_dir -if not os.path.exists(LOG_DIR): - os.makedirs(LOG_DIR) -if not os.path.exists(CONFIG_DIR): - os.makedirs(CONFIG_DIR) +LOG_DIR: str = app.user_log_dir # type: ignore +CONFIG_DIR: str = app.user_config_dir # type: ignore +if not os.path.exists(LOG_DIR): # type: ignore + os.makedirs(LOG_DIR) # type: ignore +if not os.path.exists(CONFIG_DIR): # type: ignore + os.makedirs(CONFIG_DIR) # type: ignore settings = Config(f"{CONFIG_DIR}/config.yaml") -DATABASE_DIR = ( - app.user_config_dir if settings.database.path is None else settings.database.path +DATABASE_DIR = ( # type: ignore + app.user_config_dir if settings.database.path is None else settings.database.path # type: ignore ) -if not os.path.exists(DATABASE_DIR): - os.makedirs(DATABASE_DIR) +if not os.path.exists(DATABASE_DIR): # type: ignore + os.makedirs(DATABASE_DIR) # type: ignore first_launch = settings.exists -if not os.path.exists(settings.database.temp.expanduser()): - settings.database.temp.expanduser().mkdir(parents=True, exist_ok=True) +if not os.path.exists(settings.database.temp.expanduser()): # type: ignore + settings.database.temp.expanduser().mkdir(parents=True, exist_ok=True) # type: ignore from .utils.icon import Icon if not os.path.exists("logs"): diff --git a/src/backend/__init__.py b/src/backend/__init__.py index 8c548c9..9b79959 100644 --- a/src/backend/__init__.py +++ b/src/backend/__init__.py @@ -1,8 +1,24 @@ -from .semester import Semester -from .database import Database +__all__ = [ + "AdminCommands", + "Semester", + "AutoAdder", + "AvailChecker", + "BookGrabber", + "Database", + "DocumentationThread", + "NewEditionCheckerThread", + "recreateElsaFile", + "recreateFile", + "Catalogue" +] + from .admin_console import AdminCommands -from .thread_bookgrabber import BookGrabber -from .threads_availchecker import AvailChecker -from .threads_autoadder import AutoAdder +from .create_file import recreateElsaFile, recreateFile +from .database import Database from .documentation_thread import DocumentationThread -from .create_file import recreateFile, recreateElsaFile +from .semester import Semester +from .thread_bookgrabber import BookGrabber +from .thread_neweditions import NewEditionCheckerThread +from .threads_autoadder import AutoAdder +from .threads_availchecker import AvailChecker +from .catalogue import Catalogue diff --git a/src/backend/catalogue.py b/src/backend/catalogue.py new file mode 100644 index 0000000..4f72ec1 --- /dev/null +++ b/src/backend/catalogue.py @@ -0,0 +1,101 @@ +import requests +from bs4 import BeautifulSoup + +from src.logic import BookData as Book + +from datetime import datetime +import sys +import loguru +from src import LOG_DIR +URL = "https://rds.ibs-bw.de/phfreiburg/opac/RDSIndex/Search?type0%5B%5D=allfields&lookfor0%5B%5D={}&join=AND&bool0%5B%5D=AND&type0%5B%5D=au&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ti&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ct&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=isn&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ta&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=co&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=py&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pp&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pu&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=si&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=zr&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=cc&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND" +BASE = "https://rds.ibs-bw.de" + +log = loguru.logger +log.remove() +log.add(sys.stdout, level="INFO") +log.add(f"{LOG_DIR}/application.log", rotation="1 MB", retention="10 days") + +log.add( + f"{LOG_DIR}/{datetime.now().strftime('%Y-%m-%d')}.log", + rotation="1 day", + retention="1 month", +) +class Catalogue: + def __init__(self, timeout=5): + self.timeout = timeout + reachable = self.check_connection() + if not reachable: + log.error("No internet connection available.") + raise ConnectionError("No internet connection available.") + + def check_connection(self): + try: + response = requests.get("https://www.google.com", timeout=self.timeout) + if response.status_code == 200: + return True + except requests.exceptions.RequestException as e: + log.error(f"Could not connect to google.com: {e}") + + def search_book(self, searchterm: str): + response = requests.get(URL.format(searchterm), timeout=self.timeout) + return response.text + + def search(self, link: str): + response = requests.get(link, timeout=self.timeout) + return response.text + + def get_book_links(self, searchterm: str): + response = self.search_book(searchterm) + soup = BeautifulSoup(response, "html.parser") + links = soup.find_all("a", class_="title getFull") + res = [] + for link in links: + res.append(BASE + link["href"]) + return res + + def get_book(self, searchterm: str): + log.info(f"Searching for term: {searchterm}") + + links = self.get_book_links(searchterm) + for link in links: + result = self.search(link) + # in result search for class col-xs-12 rds-dl RDS_LOCATION + # if found, return text of href + soup = BeautifulSoup(result, "html.parser") + location = soup.find_all("div", class_="col-xs-12 rds-dl RDS_LOCATION") + for loc in location: + if f"1. OG Semesterapparat" in loc.text: + title = ( + soup.find("div", class_="headline text") + .text.replace("\n", "") + .strip() + ) + ppn = soup.find( + "div", class_="col-xs-12 col-md-5 col-lg-4 rds-dl-head RDS_PPN" + ) + signature = soup.find( + "div", class_="col-xs-12 rds-dl RDS_SIGNATURE" + ) + if signature: + signature = ( + signature.find_next("div") + .find_next("div") + .text.replace("\n", "") + .strip() + ) + # use ppn to find the next div and extract the text + if ppn: + ppn = ppn.find_next("div").text.replace("\n", "").strip() + else: + ppn = None + isbn = soup.find( + "div", class_="col-xs-12 col-md-5 col-lg-4 rds-dl-head RDS_ISBN" + ) + if isbn: + isbn = isbn.find_next("div").find_next("div").text + else: + isbn = None + return Book( + title=title, ppn=ppn, signature=signature, isbn=isbn, link=link + ) + return False diff --git a/src/backend/database.py b/src/backend/database.py index eb5c31a..e2e5d03 100644 --- a/src/backend/database.py +++ b/src/backend/database.py @@ -12,7 +12,7 @@ from typing import Any, List, Optional, Tuple, Union import loguru -from src import LOG_DIR, settings, DATABASE_DIR +from src import DATABASE_DIR, LOG_DIR, settings from src.backend.db import ( CREATE_ELSA_FILES_TABLE, CREATE_ELSA_MEDIA_TABLE, @@ -38,7 +38,6 @@ log.add(sys.stdout, level="INFO") log.add(f"{LOG_DIR}/application.log", rotation="1 MB", retention="10 days") - ascii_lowercase = lower + digits + punctuation @@ -68,6 +67,57 @@ class Database: self.db_path = db_path log.debug(f"Database path: {self.db_path}") self.db_initialized = False + self.startup_check() + + def startup_check(self): + # check existence of all tables. if any is missing, recreate the table + if not self.db_initialized: + self.initializeDatabase() + tables = self.get_db_contents() + tables = [t[1] for t in tables] if tables is not None else [] + required_tables = [ + "semesterapparat", + "messages", + "media", + "files", + "prof", + "user", + "subjects", + "elsa", + "elsa_files", + "elsa_media", + ] + + for table in required_tables: + if table not in tables: + log.critical(f"Table {table} is missing, recreating...") + self.create_table(table) + + def create_table(self, table_name: str): + match table_name: + case "semesterapparat": + query = CREATE_TABLE_APPARAT + case "messages": + query = CREATE_TABLE_MESSAGES + case "media": + query = CREATE_TABLE_MEDIA + case "files": + query = CREATE_TABLE_FILES + case "prof": + query = CREATE_TABLE_PROF + case "user": + query = CREATE_TABLE_USER + case "subjects": + query = CREATE_TABLE_SUBJECTS + case "elsa": + query = CREATE_ELSA_TABLE + case "elsa_files": + query = CREATE_ELSA_FILES_TABLE + case "elsa_media": + query = CREATE_ELSA_MEDIA_TABLE + case _: + log.error(f"Table {table_name} is not a valid table name") + self.query_db(query) def initializeDatabase(self): if not self.db_initialized: @@ -201,12 +251,12 @@ class Database: logs_query = query logs_args = args - if "fileblob" in query: - # set fileblob arg in logger to "too long" - logs_query = query - fileblob_location = query.find("fileblob") - # remove fileblob from query - logs_query = query[:fileblob_location] + "fileblob = too long" + # if "fileblob" in query: + # # set fileblob arg in logger to "too long" + # logs_query = query + # fileblob_location = query.find("fileblob") + # # remove fileblob from query + # logs_query = query[:fileblob_location] + "fileblob = too long" log_message = f"Querying database with query {logs_query}, args: {logs_args}" # if "INSERT" in query: @@ -435,6 +485,7 @@ class Database: deleted (int, optional): The state of the book. Set to 1 to include deleted ones. Defaults to 0. Returns: + list[dict[int, BookData, int]]: A list of dictionaries containing the id, the metadata of the book and the availability of the book """ qdata = self.query_db( @@ -451,6 +502,46 @@ class Database: ret_result.append(data) return ret_result + def getAllBooks(self): + # return all books in the database + qdata = self.query_db("SELECT id,bookdata FROM media WHERE deleted=0") + ret_result: list[dict[str, Any]] = [] + if qdata is None: + return [] + for result_a in qdata: + data: dict[str, Any] = {"id": int, "bookdata": BookData} + data["id"] = result_a[0] + data["bookdata"] = BookData().from_string(result_a[1]) + + ret_result.append(data) + return ret_result + + def getBooksByProfId(self, prof_id: int, deleted: int = 0): + """ + Get the Books based on the professor id + + Args: + prof_id (str): The ID of the professor + deleted (int, optional): The state of the book. Set to 1 to include deleted ones. Defaults to 0. + + Returns: + + list[dict[int, BookData, int]]: A list of dictionaries containing the id, the metadata of the book and the availability of the book + """ + qdata = self.query_db( + f"SELECT id,bookdata,available FROM media WHERE prof_id={prof_id} AND (deleted={deleted if deleted == 0 else '1 OR deleted=0'})" + ) + ret_result = [] + if qdata is None: + return [] + for result_a in qdata: + data: dict[str, Any] = {"id": int, "bookdata": BookData, "available": int} + data["id"] = result_a[0] + data["bookdata"] = BookData().from_string(result_a[1]) + data["available"] = result_a[2] + ret_result.append(data) + return ret_result + def updateBookdata(self, book_id: int, bookdata: BookData): """ Update the bookdata in the database @@ -525,11 +616,12 @@ class Database: str: The filename of the recreated file """ blob = self.getBlob(filename, app_id) + log.debug(blob) tempdir = settings.database.temp.expanduser() if not tempdir.exists(): tempdir.mkdir(parents=True, exist_ok=True) file = tempfile.NamedTemporaryFile( - delete=False, dir=tempdir_path, mode="wb", suffix=f".{filetype}" + delete=False, dir=tempdir, mode="wb", suffix=f".{filetype}" ) file.write(blob) # log.debug("file created") @@ -701,6 +793,20 @@ class Database: else: return prof[0] + def getProfMailById(self, prof_id: Union[str, int]) -> str: + """get the mail of a professor based on the id + + Args: + prof_id (Union[str,int]): the id of the professor + + Returns: + str: the mail of the professor + """ + mail = self.query_db("SELECT mail FROM prof WHERE id=?", (prof_id,), one=True)[ + 0 + ] + return mail if mail is not None else "" + def getTitleById(self, prof_id: Union[str, int]) -> str: """get the title of a professor based on the id @@ -877,6 +983,23 @@ class Database: (newDate, today, app_id), ) + def getId(self, apparat_name) -> Optional[int]: + """get the id of an apparat based on the name + + Args: + apparat_name (str): the name of the apparat e.g. "Semesterapparat 1" + + Returns: + Optional[int]: the id of the apparat, if the apparat is not found, None is returned + """ + data = self.query_db( + "SELECT id FROM semesterapparat WHERE name=?", (apparat_name,), one=True + ) + if data is None: + return None + else: + return data[0] + def getApparatId(self, apparat_name) -> Optional[int]: """get the id of an apparat based on the name @@ -1014,22 +1137,22 @@ class Database: self.close_connection(conn) return ret - def deleteApparat(self, app_id: Union[str, int], semester): + def deleteApparat(self, apparat: Apparat, semester: str): """Delete an apparat from the database Args: - app_id (Union[str, int]): the id of the apparat + apparat: (Apparat): the apparat to be deleted semester (str): the semester the apparat should be deleted from """ - log.info(f"Deleting apparat with id {app_id} in semester {semester}") + apparat_nr = apparat.appnr + app_id = self.getId(apparat.name) self.query_db( - "UPDATE semesterapparat SET deletion_status=1, deleted_date=? WHERE appnr=?", - (semester, app_id), - ) - self.query_db( - "UPDATE media SET deleted=1 WHERE app_id=?", - (app_id,), + "UPDATE semesterapparat SET deletion_status=1, deleted_date=? WHERE appnr=? AND name=?", + (semester, apparat_nr, apparat.name), ) + # delete all books associated with the app_id + print(apparat_nr, app_id) + self.query_db("UPDATE media SET deleted=1 WHERE app_id=?", (app_id,)) def isEternal(self, id): """check if the apparat is eternal (dauerapparat) @@ -1701,4 +1824,4 @@ class Database: cursor.execute(query, args) result = cursor.fetchone() connection.close() - return result \ No newline at end of file + return result diff --git a/src/backend/thread_neweditions.py b/src/backend/thread_neweditions.py new file mode 100644 index 0000000..45e662f --- /dev/null +++ b/src/backend/thread_neweditions.py @@ -0,0 +1,263 @@ +import re +import sys +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from math import ceil +from queue import Empty, Queue +from typing import List, Optional, Set, Union + +import loguru +from PySide6.QtCore import QThread, Signal + +from src import LOG_DIR +from src.logic import BookData +from src.logic.lehmannsapi import LehmannsClient +from src.logic.swb import SWB + +log = loguru.logger +log.remove() +log.add(sys.stdout, level="INFO") +log.add(f"{LOG_DIR}/application.log", rotation="1 MB", retention="10 days") + +log.add( + f"{LOG_DIR}/{datetime.now().strftime('%Y-%m-%d')}.log", + rotation="1 day", + retention="1 month", +) + + +def _norm_text(s: Optional[str]) -> str: + if not s: + return "" + # lowercase, collapse whitespace, drop some punctuation + s = s.lower() + s = re.sub(r"[\s\-\u2013\u2014]+", " ", s) # spaces/dashes + s = re.sub(r"[\"'`:.,;!?()\[\]{}]", "", s) + return s.strip() + + +def _same_book(a: BookData, b: BookData) -> bool: + """Heuristic: same if ISBNs intersect; fallback to (title, author, year) normalized.""" + isbns_a = _norm_isbns(a.isbn) + isbns_b = _norm_isbns(b.isbn) + if isbns_a and isbns_b and (isbns_a & isbns_b): + return True + + ta, tb = _norm_text(a.title), _norm_text(b.title) + aa, ab = _norm_text(a.author), _norm_text(b.author) + ya, yb = (a.year or "").strip(), (b.year or "").strip() + + # strong title match required; then author if available; then year if available + if ta and tb and ta == tb: + # if both have authors, require match + if aa and ab and aa == ab: + # if both have year, require match + if ya and yb: + return ya == yb + return True + # if one/both authors missing, allow title (+year if both present) + if ya and yb: + return ya == yb + return True + + return False + + +def _norm_isbns(value: Union[str, List[str], None]) -> Set[str]: + """Return a set of 10/13-digit ISBNs (digits only, keep X for ISBN-10 if present).""" + if value is None: + return set() + vals = value if isinstance(value, list) else [value] + out: Set[str] = set() + for v in vals: + s = str(v) + digits = re.sub(r"[^0-9Xx]", "", s) + # keep 13-digit or 10-digit tokens + m13 = re.findall(r"97[89]\d{10}", digits) + if m13: + out.update(m13) + else: + m10 = re.findall(r"\d{9}[0-9Xx]", digits) + out.update(x.upper() for x in m10) + return out + + +def filter_prefer_swb(records: List[BookData]) -> List[BookData]: + """ + If an SWB entry with a non-empty signature exists for a book, drop the HTTP(S) duplicate(s). + Returns a NEW list (does not mutate the input). + """ + swb_with_sig = [ + r + for r in records + if (r.link == "SWB") and (r.signature and r.signature.strip()) + ] + if not swb_with_sig: + return list(records) + + to_remove: Set[int] = set() + + # For each URL entry, see if it matches any SWB-with-signature entry + for idx, rec in enumerate(records): + if not rec.link or not rec.link.lower().startswith("http"): + continue + for swb in swb_with_sig: + if _same_book(swb, rec): + to_remove.add(idx) + break + + # Build filtered list + return [rec for i, rec in enumerate(records) if i not in to_remove] + + +class NewEditionCheckerThread(QThread): + updateSignal = Signal(int, int) # (processed, total) + updateProgress = Signal(int, int) # (processed, total) + total_entries_signal = Signal(int) + resultsSignal = Signal(list) # list[tuple[BookData, list[BookData]]] + + def __init__(self, entries: Optional[list["BookData"]] = None, parent=None): + super().__init__(parent) + self.entries: list["BookData"] = entries if entries is not None else [] + self.results: list[tuple["BookData", list["BookData"]]] = [] + + def reset(self): + self.entries = [] + self.results = [] + + # ---------- internal helpers ---------- + + @staticmethod + def _split_evenly(items: list, parts: int) -> list[list]: + """Split items as evenly as possible into `parts` chunks (no empty tails).""" + if parts <= 1 or len(items) <= 1: + return [items] + n = len(items) + base = n // parts + extra = n % parts + chunks = [] + i = 0 + for k in range(parts): + size = base + (1 if k < extra else 0) + if size == 0: + continue + chunks.append(items[i : i + size]) + i += size + return chunks + + @staticmethod + def _clean_title(raw: str) -> str: + title = raw.rstrip(" .:,;!?") + title = re.sub(r"\s*\(.*\)", "", title) + return title.strip() + + @classmethod + def _process_book( + cls, book: "BookData" + ) -> tuple["BookData", list["BookData"]] | None: + author = ( + book.author.split(";")[0].replace(" ", "") + if (book.author and ";" in book.author) + else (book.author or "").replace(" ", "") + ) + title = cls._clean_title(book.title or "") + + # Query SWB + response: list[BookData] = SWB().getBooks( + [ + "pica.bib=20735", + f"pica.tit={title.split(':')[0].strip()}", + # f"pica.per={author}", + ] + ) + + # Remove same PPN + response = [entry for entry in response if entry.ppn != book.ppn] + for respo in response: + respo.link = "SWB" + + # Query Lehmanns + with LehmannsClient() as client: + results = client.search_by_title(title, strict=True) + if results: + for res in results: + response.append(BookData().from_LehmannsSearchResult(res)) + + if not response: + return None + + response = filter_prefer_swb(response) + + # Remove entries matching the same ISBN as the current book + response = [ + entry + for entry in response + if not (_norm_isbns(entry.isbn) & _norm_isbns(book.isbn)) + ] + + if not response: + return None + + return (book, response) + + @classmethod + def _worker(cls, items: list["BookData"], q: Queue) -> None: + """Worker for one chunk; pushes ('result', ...), ('progress', 1), and ('done', None).""" + try: + for book in items: + try: + result = cls._process_book(book) + except Exception: + result = None + if result is not None: + q.put(("result", result)) + q.put(("progress", 1)) + finally: + q.put(("done", None)) + + # ---------- thread entry point ---------- + + def run(self): + total = len(self.entries) + self.total_entries_signal.emit(total) + + if total == 0: + log.debug("No entries to process.") + self.resultsSignal.emit([]) + return + + # Up to 4 workers; ~20 items per worker + num_workers = min(4, max(1, ceil(total / 20))) + chunks = self._split_evenly(self.entries, num_workers) + sizes = [len(ch) for ch in chunks] + + q: Queue = Queue() + processed = 0 + finished_workers = 0 + + with ThreadPoolExecutor(max_workers=len(chunks)) as ex: + futures = [ex.submit(self._worker, ch, q) for ch in chunks] + + log.info( + f"Launched {len(futures)} worker thread(s) for {total} entries: {sizes} entries per thread." + ) + for idx, sz in enumerate(sizes, 1): + log.debug(f"Thread {idx}: {sz} entries") + + # Aggregate progress/results + while finished_workers < len(chunks): + try: + kind, payload = q.get(timeout=0.1) + except Empty: + continue + + if kind == "progress": + processed += int(payload) + self.updateSignal.emit(processed, total) + self.updateProgress.emit(processed, total) + elif kind == "result": + self.results.append(payload) + elif kind == "done": + finished_workers += 1 + + self.resultsSignal.emit(self.results) diff --git a/src/backend/threads_availchecker.py b/src/backend/threads_availchecker.py index 3fbf819..c3ba3cc 100644 --- a/src/backend/threads_availchecker.py +++ b/src/backend/threads_availchecker.py @@ -56,6 +56,9 @@ class AvailChecker(QThread): rds = transformer.get_data(data).return_data("rds_availability") book_id = None + if not rds or not rds.items: + log.warning(f"No RDS data found for link {link}") + continue for item in rds.items: sign = item.superlocation loc = item.location diff --git a/src/logic/csvparser.py b/src/logic/csvparser.py index 0fa23e6..e41f2e7 100644 --- a/src/logic/csvparser.py +++ b/src/logic/csvparser.py @@ -1,13 +1,12 @@ import csv - -import chardet +from charset_normalizer import detect def csv_to_list(path: str) -> list[str]: """ Extracts the data from a csv file and returns it as a pandas dataframe """ - encoding = chardet.detect(open(path, "rb").read())["encoding"] + encoding = detect(open(path, "rb").read())["encoding"] with open(path, newline="", encoding=encoding) as csvfile: # if decoder fails to map, assign "" reader = csv.reader(csvfile, delimiter=";", quotechar="|") diff --git a/src/logic/dataclass.py b/src/logic/dataclass.py index c3c4f9d..0f90d54 100644 --- a/src/logic/dataclass.py +++ b/src/logic/dataclass.py @@ -1,8 +1,8 @@ -from dataclasses import dataclass, field - -from enum import Enum import json -from typing import Union, Any, Optional +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Optional, Union + @dataclass class Prof: @@ -93,6 +93,24 @@ class BookData: ndata = json.loads(data) return BookData(**ndata) + def from_LehmannsSearchResult(self, result: Any) -> "BookData": + self.title = result.title + self.author = "; ".join(result.authors) if result.authors else None + self.edition = str(result.edition) if result.edition else None + self.link = result.url + self.isbn = ( + result.isbn13 + if isinstance(result.isbn13, list) + else [result.isbn13] + if result.isbn13 + else [] + ) + self.pages = str(result.pages) if result.pages else None + self.publisher = result.publisher + self.year = str(result.year) if result.year else None + # self.pages = str(result.pages) if result.pages else None + return self + @dataclass class MailData: diff --git a/src/logic/lehmannsapi.py b/src/logic/lehmannsapi.py new file mode 100644 index 0000000..6e5a4b2 --- /dev/null +++ b/src/logic/lehmannsapi.py @@ -0,0 +1,280 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass, asdict, field +from typing import Optional, List, Iterable +from urllib.parse import urljoin, quote_plus + +import httpx +from bs4 import BeautifulSoup + +BASE = "https://www.lehmanns.de" +SEARCH_URL = "https://www.lehmanns.de/search/quick?mediatype_id=&q=" + + +@dataclass +class LehmannsSearchResult: + title: str + url: str + + # Core fields from the listing card + year: Optional[int] = None + edition: Optional[int] = None + publisher: Optional[str] = None + isbn13: Optional[str] = None + + # Extras from the listing card + description: Optional[str] = None + authors: list[str] = field(default_factory=list) + media_type: Optional[str] = None + book_format: Optional[str] = None + price_eur: Optional[float] = None + currency: str = "EUR" + image: Optional[str] = None + + # From detail page: + pages: Optional[str] = None # " Seiten" + buyable: bool = True # set in enrich_pages (detail page) + unavailable_hint: Optional[str] = None # e.g. "Titel ist leider vergriffen; keine Neuauflage" + + def to_dict(self) -> dict: + return asdict(self) + + +class LehmannsClient: + """Scrapes quick-search results, then enriches (and filters) via product pages.""" + + def __init__(self, timeout: float = 20.0): + self.client = httpx.Client( + headers={ + "User-Agent": ( + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/124.0 Safari/537.36" + ), + "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + }, + timeout=timeout, + follow_redirects=True, + ) + + def close(self): + self.client.close() + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + + # ------------------- Search (listing) ------------------- + + def build_search_url(self, title: str) -> str: + # spaces -> '+' + return SEARCH_URL + quote_plus(title) + + def search_by_title(self, title: str, limit: Optional[int] = None, strict: bool = False) -> List[LehmannsSearchResult]: + """ + Parse the listing page only (no availability check here). + Use enrich_pages(...) afterwards to fetch detail pages, add 'pages', + and drop unbuyable items. + """ + url = self.build_search_url(title) + html = self._get(url) + if not html: + return [] + results = self._parse_results(html) + self.enrich_pages(results) + if strict: + # filter results to only those with exact title match (case-insensitive) + title_lower = title.lower() + results = [r for r in results if r.title and r.title.lower() == title_lower] + results = [r for r in results if r.buyable] + return results + if limit is not None: + results = results[:max(0, limit)] + return results + + # ------------------- Detail enrichment & filtering ------------------- + + def enrich_pages(self, results: Iterable[LehmannsSearchResult], drop_unbuyable: bool = True) -> List[LehmannsSearchResult]: + """ + Fetch each result.url, extract: + - pages: from ... + - availability: from
  • ...
  • + * if it contains "Titel ist leider vergriffen", mark buyable=False + * if it also contains "keine Neuauflage", set unavailable_hint accordingly + If drop_unbuyable=True, exclude non-buyable results from the returned list. + """ + enriched: List[LehmannsSearchResult] = [] + for r in results: + try: + html = self._get(r.url) + if not html: + # Can't verify; keep as-is when not dropping, else skip + if not drop_unbuyable: + enriched.append(r) + continue + + soup = BeautifulSoup(html, "html.parser") + + # Pages + pages_node = soup.select_one( + "span.book-meta.meta-seiten[itemprop='numberOfPages'], " + "span.book-meta.meta-seiten[itemprop='numberofpages'], " + ".meta-seiten [itemprop='numberOfPages'], " + ".meta-seiten[itemprop='numberOfPages'], " + ".book-meta.meta-seiten" + ) + if pages_node: + text = pages_node.get_text(" ", strip=True) + m = re.search(r"\d+", text) + if m: + r.pages = f"{m.group(0)} Seiten" + + # Availability via li.availability-3 + avail_li = soup.select_one("li.availability-3") + if avail_li: + avail_text = " ".join(avail_li.get_text(" ", strip=True).split()).lower() + if "titel ist leider vergriffen" in avail_text: + r.buyable = False + if "keine neuauflage" in avail_text: + r.unavailable_hint = "Titel ist leider vergriffen; keine Neuauflage" + else: + r.unavailable_hint = "Titel ist leider vergriffen" + + # Append or drop + if (not drop_unbuyable) or r.buyable: + enriched.append(r) + + except Exception: + # On any per-item error, keep the record if not dropping; else skip + if not drop_unbuyable: + enriched.append(r) + continue + + return enriched + + # ------------------- Internals ------------------- + + def _get(self, url: str) -> Optional[str]: + try: + r = self.client.get(url) + r.encoding = "utf-8" + if r.status_code == 200 and "text/html" in (r.headers.get("content-type") or ""): + return r.text + except httpx.HTTPError: + pass + return None + + def _parse_results(self, html: str) -> List[LehmannsSearchResult]: + soup = BeautifulSoup(html, "html.parser") + results: list[LehmannsSearchResult] = [] + + for block in soup.select("div.info-block"): + a = block.select_one(".title a[href]") + if not a: + continue + url = urljoin(BASE, a["href"].strip()) + base_title = (block.select_one(".title [itemprop='name']") or a).get_text(strip=True) + + # Alternative headline => extend title + alt_tag = block.select_one(".description[itemprop='alternativeHeadline']") + alternative_headline = alt_tag.get_text(strip=True) if alt_tag else None + title = f"{base_title} : {alternative_headline}" if alternative_headline else base_title + description = alternative_headline + + # Authors from .author + authors: list[str] = [] + author_div = block.select_one("div.author") + if author_div: + t = author_div.get_text(" ", strip=True) + t = re.sub(r"^\s*von\s+", "", t, flags=re.I) + for part in re.split(r"\s*;\s*|\s*&\s*|\s+und\s+", t): + name = " ".join(part.split()) + if name: + authors.append(name) + + # Media + format + media_type = None + book_format = None + type_text = block.select_one(".type") + if type_text: + t = type_text.get_text(" ", strip=True) + m = re.search(r"\b(Buch|eBook|Hörbuch)\b", t) + if m: + media_type = m.group(1) + fm = re.search(r"\(([^)]+)\)", t) + if fm: + book_format = fm.group(1).strip().upper() + + # Year + year = None + y = block.select_one("[itemprop='copyrightYear']") + if y: + try: + year = int(y.get_text(strip=True)) + except ValueError: + pass + + # Edition + edition = None + ed = block.select_one("[itemprop='bookEdition']") + if ed: + m = re.search(r"\d+", ed.get_text(strip=True)) + if m: + edition = int(m.group()) + + # Publisher + publisher = None + pub = block.select_one(".publisherprop [itemprop='name']") or block.select_one(".publisher [itemprop='name']") + if pub: + publisher = pub.get_text(strip=True) + + # ISBN-13 + isbn13 = None + isbn_tag = block.select_one(".isbn [itemprop='isbn'], [itemprop='isbn']") + if isbn_tag: + digits = re.sub(r"[^0-9Xx]", "", isbn_tag.get_text(strip=True)) + m = re.search(r"(97[89]\d{10})", digits) + if m: + isbn13 = m.group(1) + + # Price (best effort) + price_eur = None + txt = block.get_text(" ", strip=True) + mprice = re.search(r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€", txt) + if not mprice and block.parent: + sib = block.parent.get_text(" ", strip=True) + mprice = re.search(r"(\d{1,3}(?:\.\d{3})*,\d{2})\s*€", sib) + if mprice: + num = mprice.group(1).replace(".", "").replace(",", ".") + try: + price_eur = float(num) + except ValueError: + pass + + # Image (best-effort) + image = None + left_img = block.find_previous("img") + if left_img and left_img.get("src"): + image = urljoin(BASE, left_img["src"]) + + results.append( + LehmannsSearchResult( + title=title, + url=url, + description=description, + authors=authors, + media_type=media_type, + book_format=book_format, + year=year, + edition=edition, + publisher=publisher, + isbn13=isbn13, + price_eur=price_eur, + image=image, + ) + ) + + return results diff --git a/src/logic/swb.py b/src/logic/swb.py new file mode 100644 index 0000000..826c101 --- /dev/null +++ b/src/logic/swb.py @@ -0,0 +1,448 @@ +import xml.etree.ElementTree as ET +from dataclasses import dataclass, field +from typing import Dict, Iterable, List, Optional, Tuple + +import requests + +from src.logic.dataclass import BookData + +# ----------------------- +# Dataclasses +# ----------------------- + + +# --- MARC XML structures --- +@dataclass +class ControlField: + tag: str + value: str + + +@dataclass +class SubField: + code: str + value: str + + +@dataclass +class DataField: + tag: str + ind1: str = " " + ind2: str = " " + subfields: List[SubField] = field(default_factory=list) + + +@dataclass +class MarcRecord: + leader: str + controlfields: List[ControlField] = field(default_factory=list) + datafields: List[DataField] = field(default_factory=list) + + +# --- SRU record wrapper --- +@dataclass +class Record: + recordSchema: str + recordPacking: str + recordData: MarcRecord + recordPosition: int + + +@dataclass +class EchoedSearchRequest: + version: str + query: str + maximumRecords: int + recordPacking: str + recordSchema: str + + +@dataclass +class SearchRetrieveResponse: + version: str + numberOfRecords: int + records: List[Record] = field(default_factory=list) + echoedSearchRetrieveRequest: Optional[EchoedSearchRequest] = None + + +# ----------------------- +# Parser +# ----------------------- + +ZS = "http://www.loc.gov/zing/srw/" +MARC = "http://www.loc.gov/MARC21/slim" +NS = {"zs": ZS, "marc": MARC} + + +def _text(elem: Optional[ET.Element]) -> str: + return (elem.text or "") if elem is not None else "" + + +def _req_text(parent: ET.Element, path: str) -> str: + el = parent.find(path, NS) + if el is None or el.text is None: + raise ValueError(f"Required element not found or empty: {path}") + return el.text + + +def parse_marc_record(record_el: ET.Element) -> MarcRecord: + """ + record_el is the element (default ns MARC in your sample) + """ + # leader + leader_text = _req_text(record_el, "marc:leader") + + # controlfields + controlfields: List[ControlField] = [] + for cf in record_el.findall("marc:controlfield", NS): + tag = cf.get("tag", "").strip() + controlfields.append(ControlField(tag=tag, value=_text(cf))) + + # datafields + datafields: List[DataField] = [] + for df in record_el.findall("marc:datafield", NS): + tag = df.get("tag", "").strip() + ind1 = df.get("ind1") or " " + ind2 = df.get("ind2") or " " + subfields: List[SubField] = [] + for sf in df.findall("marc:subfield", NS): + code = sf.get("code", "") + subfields.append(SubField(code=code, value=_text(sf))) + datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields)) + + return MarcRecord( + leader=leader_text, controlfields=controlfields, datafields=datafields + ) + + +def parse_record(zs_record_el: ET.Element) -> Record: + recordSchema = _req_text(zs_record_el, "zs:recordSchema") + recordPacking = _req_text(zs_record_el, "zs:recordPacking") + + # recordData contains a MARC with default MARC namespace in your sample + recordData_el = zs_record_el.find("zs:recordData", NS) + if recordData_el is None: + raise ValueError("Missing zs:recordData") + + marc_record_el = recordData_el.find("marc:record", NS) + if marc_record_el is None: + # If the MARC record uses default ns (xmlns="...") ElementTree still needs the ns-qualified name + # We already searched with prefix; this covers both default and prefixed cases. + raise ValueError("Missing MARC21 record inside zs:recordData") + + marc_record = parse_marc_record(marc_record_el) + + recordPosition = int(_req_text(zs_record_el, "zs:recordPosition")) + return Record( + recordSchema=recordSchema, + recordPacking=recordPacking, + recordData=marc_record, + recordPosition=recordPosition, + ) + + +def parse_echoed_request(root: ET.Element) -> Optional[EchoedSearchRequest]: + el = root.find("zs:echoedSearchRetrieveRequest", NS) + if el is None: + return None + + # Be permissive with missing fields + version = _text(el.find("zs:version", NS)) + query = _text(el.find("zs:query", NS)) + maximumRecords_text = _text(el.find("zs:maximumRecords", NS)) or "0" + recordPacking = _text(el.find("zs:recordPacking", NS)) + recordSchema = _text(el.find("zs:recordSchema", NS)) + + try: + maximumRecords = int(maximumRecords_text) + except ValueError: + maximumRecords = 0 + + return EchoedSearchRequest( + version=version, + query=query, + maximumRecords=maximumRecords, + recordPacking=recordPacking, + recordSchema=recordSchema, + ) + + +def parse_search_retrieve_response(xml_str: str) -> SearchRetrieveResponse: + root = ET.fromstring(xml_str) + + # Root is zs:searchRetrieveResponse + version = _req_text(root, "zs:version") + numberOfRecords = int(_req_text(root, "zs:numberOfRecords")) + + records_parent = root.find("zs:records", NS) + records: List[Record] = [] + if records_parent is not None: + for r in records_parent.findall("zs:record", NS): + records.append(parse_record(r)) + + echoed = parse_echoed_request(root) + + return SearchRetrieveResponse( + version=version, + numberOfRecords=numberOfRecords, + records=records, + echoedSearchRetrieveRequest=echoed, + ) + + +# --- Query helpers over MarcRecord --- + + +def iter_datafields( + rec: MarcRecord, + tag: Optional[str] = None, + ind1: Optional[str] = None, + ind2: Optional[str] = None, +) -> Iterable[DataField]: + """Yield datafields, optionally filtered by tag/indicators.""" + for df in rec.datafields: + if tag is not None and df.tag != tag: + continue + if ind1 is not None and df.ind1 != ind1: + continue + if ind2 is not None and df.ind2 != ind2: + continue + yield df + + +def subfield_values( + rec: MarcRecord, + tag: str, + code: str, + *, + ind1: Optional[str] = None, + ind2: Optional[str] = None, +) -> List[str]: + """All values for subfield `code` in every `tag` field (respecting indicators).""" + out: List[str] = [] + for df in iter_datafields(rec, tag, ind1, ind2): + out.extend(sf.value for sf in df.subfields if sf.code == code) + return out + + +def first_subfield_value( + rec: MarcRecord, + tag: str, + code: str, + *, + ind1: Optional[str] = None, + ind2: Optional[str] = None, + default: Optional[str] = None, +) -> Optional[str]: + """First value for subfield `code` in `tag` (respecting indicators).""" + for df in iter_datafields(rec, tag, ind1, ind2): + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def find_datafields_with_subfields( + rec: MarcRecord, + tag: str, + *, + where_all: Optional[Dict[str, str]] = None, + where_any: Optional[Dict[str, str]] = None, + casefold: bool = False, + ind1: Optional[str] = None, + ind2: Optional[str] = None, +) -> List[DataField]: + """ + Return datafields of `tag` whose subfields match constraints: + - where_all: every (code -> exact value) must be present + - where_any: at least one (code -> exact value) present + Set `casefold=True` for case-insensitive comparison. + """ + where_all = where_all or {} + where_any = where_any or {} + matched: List[DataField] = [] + + for df in iter_datafields(rec, tag, ind1, ind2): + # Map code -> list of values (with optional casefold applied) + vals: Dict[str, List[str]] = {} + for sf in df.subfields: + v = sf.value.casefold() if casefold else sf.value + vals.setdefault(sf.code, []).append(v) + + ok = True + for c, v in where_all.items(): + vv = v.casefold() if casefold else v + if c not in vals or vv not in vals[c]: + ok = False + break + + if ok and where_any: + any_ok = any( + (c in vals) and ((v.casefold() if casefold else v) in vals[c]) + for c, v in where_any.items() + ) + if not any_ok: + ok = False + + if ok: + matched.append(df) + + return matched + + +def controlfield_value( + rec: MarcRecord, tag: str, default: Optional[str] = None +) -> Optional[str]: + """Get the first controlfield value by tag (e.g., '001', '005').""" + for cf in rec.controlfields: + if cf.tag == tag: + return cf.value + return default + + +def datafields_value( + data: List[DataField], code: str, default: Optional[str] = None +) -> Optional[str]: + """Get the first value for a specific subfield code in a list of datafields.""" + for df in data: + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def datafield_value( + df: DataField, code: str, default: Optional[str] = None +) -> Optional[str]: + """Get the first value for a specific subfield code in a datafield.""" + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def _smart_join_title(a: str, b: Optional[str]) -> str: + """ + Join 245 $a and $b with MARC-style punctuation. + If $b is present, join with ' : ' unless either side already supplies punctuation. + """ + a = a.strip() + if not b: + return a + b = b.strip() + if a.endswith((":", ";", "/")) or b.startswith((":", ";", "/")): + return f"{a} {b}" + return f"{a} : {b}" + + +def subfield_values_from_fields( + fields: Iterable[DataField], + code: str, +) -> List[str]: + """All subfield values with given `code` across a list of DataField.""" + return [sf.value for df in fields for sf in df.subfields if sf.code == code] + + +def first_subfield_value_from_fields( + fields: Iterable[DataField], + code: str, + default: Optional[str] = None, +) -> Optional[str]: + """First subfield value with given `code` across a list of DataField.""" + for df in fields: + for sf in df.subfields: + if sf.code == code: + return sf.value + return default + + +def subfield_value_pairs_from_fields( + fields: Iterable[DataField], + code: str, +) -> List[Tuple[DataField, str]]: + """ + Return (DataField, value) pairs for all subfields with `code`. + Useful if you need to know which field a value came from. + """ + out: List[Tuple[DataField, str]] = [] + for df in fields: + for sf in df.subfields: + if sf.code == code: + out.append((df, sf.value)) + return out + + +def book_from_marc(rec: MarcRecord) -> BookData: + # PPN from controlfield 001 + ppn = controlfield_value(rec, "001") + + # Title = 245 $a + 245 $b (if present) + t_a = first_subfield_value(rec, "245", "a") + t_b = first_subfield_value(rec, "245", "b") + title = _smart_join_title(t_a, t_b) if t_a else None + + # Signature = 924 where $9 == "Frei 129" → take that field's $g + frei_fields = find_datafields_with_subfields( + rec, "924", where_all={"9": "Frei 129"} + ) + signature = first_subfield_value_from_fields(frei_fields, "g") + + # Year = 264 $c (prefer ind2="1" publication; fallback to any 264) + year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value( + rec, "264", "c" + ) + isbn = subfield_values(rec, "020", "a") + + return BookData( + ppn=ppn, + title=title, + signature=signature, + edition=first_subfield_value(rec, "250", "a"), + year=year, + pages=first_subfield_value(rec, "300", "a"), + publisher=first_subfield_value(rec, "264", "b"), + isbn=isbn, + ) + + +class SWB: + def __init__(self): + self.url = "https://sru.k10plus.de/opac-de-627!rec=1?version=1.1&operation=searchRetrieve&query={}&maximumRecords=10&recordSchema=marcxml" + self.bib_id = 20735 + + def get(self, query_args: Iterable[str]) -> List[Record]: + # if any query_arg ends with =, remove it + query_args = [arg for arg in query_args if not arg.endswith("=")] + query = "+and+".join(query_args) + query = query.replace(" ", "%20").replace("&", "%26") + + url = self.url.format(query) + + print("Fetching from SWB:", url) + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", + "Accept": "application/xml", + "Accept-Charset": "latin1,utf-8;q=0.7,*;q=0.3", + } + response = requests.get(url, headers=headers) + if response.status_code != 200: + raise Exception(f"Error fetching data from SWB: {response.status_code}") + # print(response.text) + data = response.content + + # extract top-level response + response = parse_search_retrieve_response(data) + return response.records + + def getBooks(self, query_args: Iterable[str]) -> List[BookData]: + records: List[Record] = self.get(query_args) + books: List[BookData] = [] + title = query_args[1].split("=")[1] + # print(len(records), "records found") + for rec in records: + book = book_from_marc(rec.recordData) + books.append(book) + books = [ + b for b in books if b.title and b.title.lower().startswith(title.lower()) + ] + return books diff --git a/src/logic/webrequest.py b/src/logic/webrequest.py index 19505d1..e545cf5 100644 --- a/src/logic/webrequest.py +++ b/src/logic/webrequest.py @@ -1,17 +1,18 @@ +import sys +from typing import Any, Optional, Union + +import loguru import requests from bs4 import BeautifulSoup - # import sleep_and_retry decorator to retry requests from ratelimit import limits, sleep_and_retry -from typing import Union, Any, Optional -from src.logic.dataclass import BookData +from src import LOG_DIR +from src.logic.dataclass import BookData from src.transformers import ARRAYData, BibTeXData, COinSData, RDSData, RISData from src.transformers.transformers import RDS_AVAIL_DATA, RDS_GENERIC_DATA -import loguru -import sys -from src import LOG_DIR + log = loguru.logger log.remove() log.add(sys.stdout, level="INFO") @@ -20,7 +21,6 @@ log.add(f"{LOG_DIR}/application.log", rotation="1 MB", retention="10 days") # logger.add(sys.stderr, format="{time} {level} {message}", level="INFO") - API_URL = "https://rds.ibs-bw.de/phfreiburg/opac/RDSIndexrecord/{}/" PPN_URL = "https://rds.ibs-bw.de/phfreiburg/opac/RDSIndex/Search?type0%5B%5D=allfields&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=au&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ti&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ct&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=isn&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=ta&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=co&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=py&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pp&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=pu&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=si&lookfor0%5B%5D={}&join=AND&bool0%5B%5D=AND&type0%5B%5D=zr&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND&type0%5B%5D=cc&lookfor0%5B%5D=&join=AND&bool0%5B%5D=AND" BASE = "https://rds.ibs-bw.de" @@ -111,21 +111,8 @@ class WebRequest: locations = soup.find_all("div", class_="col-xs-12 rds-dl RDS_LOCATION") if locations: for location in locations: - item_location = location.find( - "div", class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel" - ).text.strip() - log.debug(f"Item location: {item_location}") - if self.use_any: - pre_tag = soup.find_all("pre") - if pre_tag: - for tag in pre_tag: - data = tag.text.strip() - return_data.append(data) - return return_data - else: - log.error("No
     tag found")
    -                            raise ValueError("No 
     tag found")
    -                    elif f"Semesterapparat-{self.apparat}" in item_location:
    +                    if "1. OG Semesterapparat" in location.text:
    +                        log.success("Found Semesterapparat, adding entry")
                             pre_tag = soup.find_all("pre")
                             return_data = []
                             if pre_tag:
    @@ -137,10 +124,36 @@ class WebRequest:
                                 log.error("No 
     tag found")
                                 return return_data
                         else:
    -                        log.error(
    -                            f"Signature {self.signature} not found in {item_location}"
    -                        )
    -                        # return_data = []
    +                        item_location = location.find(
    +                            "div", class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel"
    +                        ).text.strip()
    +                        log.debug(f"Item location: {item_location}")
    +                        if self.use_any:
    +                            pre_tag = soup.find_all("pre")
    +                            if pre_tag:
    +                                for tag in pre_tag:
    +                                    data = tag.text.strip()
    +                                    return_data.append(data)
    +                                return return_data
    +                            else:
    +                                log.error("No 
     tag found")
    +                                raise ValueError("No 
     tag found")
    +                        elif f"Semesterapparat-{self.apparat}" in item_location:
    +                            pre_tag = soup.find_all("pre")
    +                            return_data = []
    +                            if pre_tag:
    +                                for tag in pre_tag:
    +                                    data = tag.text.strip()
    +                                    return_data.append(data)
    +                                return return_data
    +                            else:
    +                                log.error("No 
     tag found")
    +                                return return_data
    +                        else:
    +                            log.error(
    +                                f"Signature {self.signature} not found in {item_location}"
    +                            )
    +                            # return_data = []
     
             return return_data
     
    diff --git a/src/logic/wordparser.py b/src/logic/wordparser.py
    index e3741c9..4cba286 100644
    --- a/src/logic/wordparser.py
    +++ b/src/logic/wordparser.py
    @@ -9,7 +9,7 @@ from bs4 import BeautifulSoup
     from docx import Document
     
     from src import LOG_DIR
    -from src.backend import Semester
    +from src.backend.semester import Semester
     from src.logic.openai import name_tester, run_shortener, semester_converter
     
     log = loguru.logger
    @@ -18,7 +18,6 @@ log.add(sys.stdout, level="INFO")
     log.add(f"{LOG_DIR}/application.log", rotation="1 MB", retention="10 days")
     
     
    -
     letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
     
     
    @@ -111,6 +110,7 @@ class SemapDocument:
             else:
                 self.title_suggestions = []
             pass
    +
         @property
         def renameSemester(self) -> None:
             if ", Dauer" in self.semester:
    @@ -141,8 +141,8 @@ def word_docx_to_csv(path: str) -> list[pd.DataFrame]:
     
                     text = text.replace("\n", "")
                     row_data.append(text)
    -                if text == "Ihr Fach:":
    -                    row_data.append(get_fach(path))
    +                # if text == "Ihr Fach:":
    +                #     row_data.append(get_fach(path))
                 data.append(row_data)
             df = pd.DataFrame(data)
             df.columns = df.iloc[0]
    @@ -265,7 +265,7 @@ def elsa_word_to_csv(path: str):
         return tuple_to_dict(data, doctype), doctype
     
     
    -def word_to_semap(word_path: str) -> SemapDocument:
    +def word_to_semap(word_path: str, ai: bool = True) -> SemapDocument:
         log.info("Parsing Word Document {}", word_path)
         semap = SemapDocument()
         df = word_docx_to_csv(word_path)
    @@ -286,8 +286,9 @@ def word_to_semap(word_path: str) -> SemapDocument:
         appdata = {keys[i]: keys[i + 1] for i in range(0, len(keys), 2)}
         semap.title = appdata["Veranstaltung:"]
         semap.semester = appdata["Semester:"]
    -    semap.renameSemester
    -    semap.nameSetter
    +    if ai:
    +        semap.renameSemester
    +        semap.nameSetter
     
         books = df[2]
         booklist = []
    @@ -309,7 +310,5 @@ def word_to_semap(word_path: str) -> SemapDocument:
     
     
     if __name__ == "__main__":
    -    else_df = elsa_word_to_csv(
    -        "C:/Users/aky547/Desktop/ELSA_Bestellung Scann Der Westen und der Rest.docx"
    -    )
    +    else_df = word_to_semap("C:/Users/aky547/Desktop/semap/db/temp/tmpzsz_hgdr.docx")
         print(else_df)
    diff --git a/src/sounds/ding.mp3 b/src/sounds/ding.mp3
    new file mode 100644
    index 0000000..9684c72
    Binary files /dev/null and b/src/sounds/ding.mp3 differ
    diff --git a/src/ui/dialogs/mail.py b/src/ui/dialogs/mail.py
    index 833233a..caaf826 100644
    --- a/src/ui/dialogs/mail.py
    +++ b/src/ui/dialogs/mail.py
    @@ -1,23 +1,21 @@
     import os
     import sys
     
    +import loguru
     from PySide6 import QtWidgets
     
    -from src import Icon, settings as config
    -
    +from src import LOG_DIR, Icon
    +from src import settings as config
     
     from .dialog_sources.Ui_mail_preview import Ui_eMailPreview as MailPreviewDialog
     from .mailTemplate import MailTemplateDialog
    -import loguru
    -import sys
    -from src import LOG_DIR
    +
     log = loguru.logger
     log.remove()
     log.add(sys.stdout, level="INFO")
     log.add(f"{LOG_DIR}/application.log", rotation="1 MB", retention="10 days")
     
     
    -
     empty_signature = """