import datetime import json import os import re import sqlite3 as sql import tempfile from dataclasses import asdict from pathlib import Path from string import ascii_lowercase as lower from string import digits, punctuation from typing import Any, List, Optional, Tuple, Union from src import DATABASE_DIR, settings from src.backend.db import ( CREATE_ELSA_FILES_TABLE, CREATE_ELSA_MEDIA_TABLE, CREATE_ELSA_TABLE, CREATE_TABLE_APPARAT, CREATE_TABLE_FILES, CREATE_TABLE_MEDIA, CREATE_TABLE_MESSAGES, CREATE_TABLE_NEWEDITIONS, CREATE_TABLE_PROF, CREATE_TABLE_SUBJECTS, CREATE_TABLE_USER, ) from src.core.models import ELSA, Apparat, ApparatData, BookData, Prof from src.errors import AppPresentError, NoResultError from src.logic.constants import SEMAP_MEDIA_ACCOUNTS from src.logic.semester import Semester from src.shared.logging import log, get_bloat_logger, preview from src.utils.blob import create_blob ascii_lowercase = lower + digits + punctuation # get the line that called the function class Database: """Initialize the database and create the tables if they do not exist.""" def __init__(self, db_path: Union[Path, None] = None): """Default constructor for the database class Args: db_path (str, optional): Optional Path for testing / specific purposes. Defaults to None. """ if db_path is None: if settings.database.path is not None: self.db_path = Path( settings.database.path.expanduser(), settings.database.name, ) else: self.db_path = None # self.db_path = self.db_path.replace("~", str(Path.home())) else: self.db_path = db_path log.debug(f"Database path: {self.db_path}") self.db_initialized = False self.startup_check() def startup_check(self): # check existence of all tables. if any is missing, recreate the table if not self.db_initialized: self.initializeDatabase() tables = self.get_db_contents() tables = [t[1] for t in tables] if tables is not None else [] required_tables = [ "semesterapparat", "messages", "media", "files", "prof", "user", "subjects", "elsa", "elsa_files", "elsa_media", "neweditions", ] for table in required_tables: if table not in tables: log.critical(f"Table {table} is missing, recreating...") self.create_table(table) def create_table(self, table_name: str): match table_name: case "semesterapparat": query = CREATE_TABLE_APPARAT case "messages": query = CREATE_TABLE_MESSAGES case "media": query = CREATE_TABLE_MEDIA case "files": query = CREATE_TABLE_FILES case "prof": query = CREATE_TABLE_PROF case "user": query = CREATE_TABLE_USER case "subjects": query = CREATE_TABLE_SUBJECTS case "elsa": query = CREATE_ELSA_TABLE case "elsa_files": query = CREATE_ELSA_FILES_TABLE case "elsa_media": query = CREATE_ELSA_MEDIA_TABLE case "neweditions": query = CREATE_TABLE_NEWEDITIONS case _: log.error(f"Table {table_name} is not a valid table name") self.query_db(query) def initializeDatabase(self): if not self.db_initialized: self.checkDatabaseStatus() self.db_initialized = True # run migrations after initial creation to bring schema up-to-date try: if self.db_path is not None: self.run_migrations() except Exception as e: log.error(f"Error while running migrations: {e}") # --- Migration helpers integrated into Database --- def _ensure_migrations_table(self, conn: sql.Connection) -> None: cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS schema_migrations ( id TEXT PRIMARY KEY, applied_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ) """, ) conn.commit() def _applied_migrations(self, conn: sql.Connection) -> List[str]: cursor = conn.cursor() cursor.execute("SELECT id FROM schema_migrations ORDER BY id") rows = cursor.fetchall() return [r[0] for r in rows] def _apply_sql_file(self, conn: sql.Connection, path: Path) -> None: log.info(f"Applying migration {path.name}") sql_text = path.read_text(encoding="utf-8") cursor = conn.cursor() cursor.executescript(sql_text) cursor.execute( "INSERT OR REPLACE INTO schema_migrations (id) VALUES (?)", (path.name,), ) conn.commit() def run_migrations(self) -> None: """Apply unapplied .sql migrations from src/backend/migrations using this Database's connection.""" migrations_dir = Path(__file__).parent / "migrations" if not migrations_dir.exists(): log.debug("Migrations directory does not exist, skipping migrations") return conn = self.connect() try: self._ensure_migrations_table(conn) applied = set(self._applied_migrations(conn)) migration_files = sorted( [p for p in migrations_dir.iterdir() if p.suffix == ".sql"], ) for m in migration_files: if m.name in applied: log.debug(f"Skipping already applied migration {m.name}") continue self._apply_sql_file(conn, m) finally: conn.close() # --- end migration helpers --- def overwritePath(self, new_db_path: str): log.debug("got new path, overwriting") self.db_path = Path(new_db_path) def checkDatabaseStatus(self): path = settings.database.path if path is None: path = Path(DATABASE_DIR) # path = path.replace("~", str(Path.home())) # path = os.path.abspath(path) if not os.path.exists(path): # create path # log.debug(path) os.makedirs(path) if self.get_db_contents() == []: log.critical("Database does not exist, creating tables") log.critical(f"Path: {path}") self.create_tables() self.insertSubjects() def getElsaMediaID(self, work_author: str, signature: str, pages: str): query = ( "SELECT id FROM elsa_media WHERE work_author=? AND signature=? AND pages=?" ) params = (work_author, signature, pages) result = self.query_db(query, params, one=True) if result is None: return NoResultError( f"work_author: {work_author}, signature: {signature}, pages: {pages}", ).__str__() return result[0] def getElsaMediaType(self, id): query = "SELECT type FROM elsa_media WHERE id=?" return self.query_db(query, (id,), one=True)[0] def get_db_contents(self) -> Union[List[Tuple[Any]], None]: """Get the contents of the Returns: Union[List[Tuple], None]: _description_ """ try: with sql.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM sqlite_master WHERE type='table'") return cursor.fetchall() except sql.OperationalError: return None def connect(self) -> sql.Connection: """Connect to the database Returns: sql.Connection: The active connection to the database """ conn = sql.connect(self.db_path) # Fast pragmas suitable for a desktop app DB conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") conn.execute("PRAGMA temp_store=MEMORY;") conn.execute("PRAGMA mmap_size=134217728;") # 128MB return conn def close_connection(self, conn: sql.Connection): """Closes the connection to the database Args: ---- - conn (sql.Connection): the connection to be closed """ conn.close() def create_tables(self): """Create the tables in the database""" # Bootstrapping of tables is handled via migrations. Run migrations instead # of executing the hard-coded DDL here. Migrations are idempotent and # contain the CREATE TABLE IF NOT EXISTS statements. self.run_migrations() def insertInto(self, query: str, params: Tuple) -> None: """Insert sent data into the database Args: query (str): The query to be executed params (Tuple): the parameters to be inserted into the database """ conn = self.connect() cursor = conn.cursor() log.debug(f"Inserting into DB: {query}") cursor.execute(query, params) conn.commit() self.close_connection(conn) def getWebADISAuth(self) -> Tuple[str, str]: """Get the WebADIS authentication data from the database Returns: Tuple[str, str]: The username and password for WebADIS """ result = self.query_db( "SELECT username, password FROM webadis_login WHERE effective_range='SAP'", one=True, ) if result is None: return ("", "") return (result[0], result[1]) @log.catch def query_db( self, query: str, args: Tuple[Any] = (), # type:ignore one: bool = False, # type:ignore ) -> Union[Tuple[Any, Any], List[Tuple[Any, Any]]]: """Query the Database for the sent query. Args: query (str): The query to be executed args (Tuple, optional): The arguments for the query. Defaults to (). one (bool, optional): Return the first result only. Defaults to False. Returns: Union[Tuple | List[Tuple]]: Returns the result of the query """ conn = self.connect() cursor = conn.cursor() logs_query = query logs_args = args # if "fileblob" in query: # # set fileblob arg in logger to "too long" # logs_query = query # fileblob_location = query.find("fileblob") # # remove fileblob from query # logs_query = query[:fileblob_location] + "fileblob = too long" log_message = f"Querying database with query {logs_query}, args: {logs_args}" # if "INSERT" in query: # log_message = f"Querying database with query {query}" if "INTO user" in query: log_message = f"Querying database with query {query}" # log.debug(f"DB Query: {log_message}") log.debug(log_message) try: cursor.execute(query, args) rv = cursor.fetchall() conn.commit() self.close_connection(conn) except sql.OperationalError as e: log.error(f"Error in query: {e}") return None return (rv[0] if rv else None) if one else rv # Books def addBookToDatabase( self, bookdata: BookData, app_id: Union[str, int], prof_id: Union[str, int], ): """Add books to the database. Both app_id and prof_id are required to add the book to the database, as the app_id and prof_id are used to select the books later on. Args: bookdata (BookData): The metadata of the book to be added app_id (str): The apparat id where the book should be added to prof_id (str): The id of the professor where the book should be added to. """ log.info(f"Adding book {bookdata.signature} to database") if app_id is None or prof_id is None: raise ValueError("Apparate ID or Prof ID is None") conn = self.connect() cursor = conn.cursor() t_query = ( f"SELECT bookdata FROM media WHERE app_id={app_id} AND prof_id={prof_id}" ) log.debug(t_query) # # log.debug(t_query) result = cursor.execute(t_query).fetchall() result = [BookData().from_string(i[0]) for i in result] if bookdata in result: # log.debug("Bookdata already in database") # check if the book was deleted in the apparat query = ( "SELECT deleted FROM media WHERE app_id=? AND prof_id=? AND bookdata=?" ) params = (app_id, prof_id, json.dumps(asdict(bookdata), ensure_ascii=False)) result = cursor.execute(query, params).fetchone() if result[0] == 1: # log.debug("Book was deleted, updating bookdata") query = "UPDATE media SET deleted=0 WHERE app_id=? AND prof_id=? AND bookdata=?" params = ( app_id, prof_id, json.dumps(asdict(bookdata), ensure_ascii=False), ) cursor.execute(query, params) conn.commit() return query = ( "INSERT INTO media (bookdata, app_id, prof_id,deleted) VALUES (?, ?, ?,?)" ) converted = json.dumps(asdict(bookdata), ensure_ascii=False) params = (converted, app_id, prof_id, 0) cursor.execute(query, params) logMessage = f"Added book with signature {bookdata.signature} to database, data: {converted}" log.info(logMessage) conn.commit() self.close_connection(conn) def getBookIdBasedOnSignature( self, app_id: Union[str, int], prof_id: Union[str, int], signature: str, ) -> int: """Get a book id based on the signature of the book. Args: app_id (str): The apparat id the book should be associated with prof_id (str): The professor id the book should be associated with signature (str): The signature of the book Returns: int: The id of the book """ result = self.query_db( "SELECT bookdata, id FROM media WHERE app_id=? AND prof_id=?", (app_id, prof_id), ) books = [(BookData().from_string(i[0]), i[1]) for i in result] book = [i for i in books if i[0].signature == signature][0][1] return book def getBookBasedOnSignature( self, app_id: Union[str, int], prof_id: Union[str, int], signature: str, ) -> BookData: """Get the book based on the signature of the book. Args: app_id (str): The apparat id the book should be associated with prof_id (str): The professor id the book should be associated with signature (str): The signature of the book Returns: BookData: The total metadata of the book wrapped in a BookData object """ result = self.query_db( "SELECT bookdata FROM media WHERE app_id=? AND prof_id=?", (app_id, prof_id), ) books: list[BookData] = [BookData().from_string(i[0]) for i in result] book = [i for i in books if i.signature == signature][0] return book def getLastBookId(self) -> int: """Get the last book id in the database Returns: int: ID of the last book in the database """ return self.query_db("SELECT id FROM media ORDER BY id DESC", one=True)[0] def searchBook( self, data: dict[str, str], ) -> Optional[list[tuple["BookData", int, int]]]: """Search a book in the database using regex against signature/title. Args: data: may contain: - "signature": regex to match against BookData.signature - "title": regex to match against BookData.title Returns: list of (BookData, app_id, prof_id) tuples, or None if invalid args """ # Determine mode (kept compatible with your original logic) mode = 0 if len(data) == 1 and "signature" in data: mode = 1 elif len(data) == 1 and "title" in data: mode = 2 elif len(data) == 2 and "signature" in data and "title" in data: mode = 3 else: return None def _compile(expr: str) -> re.Pattern: try: return re.compile(expr, re.IGNORECASE | re.UNICODE) except re.error: # If user provided a broken regex, treat it as a literal return re.compile(re.escape(expr), re.IGNORECASE | re.UNICODE) sig_re = _compile(data["signature"]) if mode in (1, 3) else None title_re = _compile(data["title"]) if mode in (2, 3) else None # Fetch candidates once rows = self.query_db("SELECT * FROM media WHERE deleted=0") results: list[tuple[BookData, int, int]] = [] for row in rows: bookdata = BookData().from_string( row[1], ) # assumes row[1] is the serialized bookdata app_id = row[2] prof_id = row[3] sig_val = bookdata.signature title_val = bookdata.title if mode == 1: if sig_re.search(sig_val): results.append((bookdata, app_id, prof_id)) elif mode == 2: if title_re.search(title_val): results.append((bookdata, app_id, prof_id)) elif sig_re.search(sig_val) and title_re.search(title_val): results.append((bookdata, app_id, prof_id)) return results def setAvailability(self, book_id: str, available: str): """Set the availability of a book in the database Args: book_id (str): The id of the book available (str): The availability of the book """ self.query_db("UPDATE media SET available=? WHERE id=?", (available, book_id)) def getBookId( self, bookdata: BookData, app_id: Union[str, int], prof_id: Union[str, int], ) -> int: """Get the id of a book based on the metadata of the book Args: bookdata (BookData): The wrapped metadata of the book app_id (str): The apparat id the book should be associated with prof_id (str): The professor id the book should be associated with Returns: int: ID of the book """ result = self.query_db( "SELECT id FROM media WHERE bookdata=? AND app_id=? AND prof_id=?", (bookdata.to_dict, app_id, prof_id), one=True, ) return result[0] def getBook(self, book_id: int) -> BookData: """Get the book based on the id in the database Args: book_id (int): The id of the book Returns: BookData: The metadata of the book wrapped in a BookData object """ return BookData().from_string( self.query_db( "SELECT bookdata FROM media WHERE id=?", (book_id,), one=True, )[0], ) def getBooks( self, app_id: Union[str, int], prof_id: Union[str, int], deleted: int = 0, ) -> list[dict[str, Union[BookData, int]]]: """Get the Books based on the apparat id and the professor id Args: app_id (str): The ID of the apparat prof_id (str): The ID of the professor deleted (int, optional): The state of the book. Set to 1 to include deleted ones. Defaults to 0. Returns: list[dict[int, BookData, int]]: A list of dictionaries containing the id, the metadata of the book and the availability of the book """ qdata = self.query_db( f"SELECT id,bookdata,available FROM media WHERE (app_id={app_id} AND prof_id={prof_id}) AND (deleted={deleted if deleted == 0 else '1 OR deleted=0'})", ) ret_result = [] if qdata is None: return [] for result_a in qdata: data: dict[str, Any] = {"id": int, "bookdata": BookData, "available": int} data["id"] = result_a[0] data["bookdata"] = BookData().from_string(result_a[1]) data["available"] = result_a[2] ret_result.append(data) return ret_result def getAllBooks(self) -> list[dict[str, Union[int, BookData]]]: """Get all books in the database that are not set as deleted Returns ------- list[dict[str, Union[int, BookData]]] A list of dictionaries containing the id and the metadata of the book """ # return all books in the database qdata = self.query_db("SELECT id,bookdata FROM media WHERE deleted=0") ret_result: list[dict[str, Any]] = [] if qdata is None: return [] for result_a in qdata: data: dict[str, Any] = {"id": int, "bookdata": BookData} data["id"] = result_a[0] data["bookdata"] = BookData().from_string(result_a[1]) ret_result.append(data) return ret_result def getApparatNrByBookId(self, book_id): appNr = self.query_db( "SELECT appnr FROM semesterapparat WHERE id IN (SELECT app_id FROM media WHERE id=?)", (book_id,), one=True, ) return appNr[0] if appNr else None def getBooksByProfId( self, prof_id: int, deleted: int = 0, ) -> list[dict[str, Union[int, BookData]]]: """Get the Books based on the professor id Parameters ---------- prof_id : int The ID of the professor deleted : int, optional If set to 1, it will include deleted books, by default 0 Returns ------- list[dict[str, Union[int, BookData]]] A list of dictionaries containing the id, the metadata of the book and the availability of the book """ qdata = self.query_db( f"SELECT id,bookdata,available FROM media WHERE prof_id={prof_id} AND (deleted={deleted if deleted == 0 else '1 OR deleted=0'})", ) ret_result = [] if qdata is None: return [] for result_a in qdata: data: dict[str, Any] = {"id": int, "bookdata": BookData, "available": int} data["id"] = result_a[0] data["bookdata"] = BookData().from_string(result_a[1]) data["available"] = result_a[2] ret_result.append(data) return ret_result def updateBookdata(self, book_id: int, bookdata: BookData): """Update the bookdata in the database Args: book_id (str): The id of the book bookdata (BookData): The new metadata of the book """ query = "UPDATE media SET bookdata= ? WHERE id=?" book = bookdata.to_dict self.query_db(query, (book, book_id)) def deleteBook(self, book_id: int): """Delete a book from the database Args: book_id (str): ID of the book """ self.query_db("UPDATE media SET deleted=1 WHERE id=?", (book_id,)) def deleteBooks(self, ids: list[int]): """Delete multiple books from the database Args: ids (list[int]): A list of book ids to be deleted """ query = f"UPDATE media SET deleted=1 WHERE id IN ({','.join(['?'] * len(ids))})" self.query_db(query, tuple(ids)) # File Interactions def getBlob(self, filename: str, app_id: Union[str, int]) -> bytes: """Get a blob from the database Args: filename (str): The name of the file app_id (str): ID of the apparat Returns: bytes: The file stored in """ return self.query_db( "SELECT fileblob FROM files WHERE filename=? AND app_id=?", (filename, app_id), one=True, )[0] def insertFile( self, file: list[dict], app_id: Union[str, int], prof_id: Union[str, int], ): """Instert a list of files into the database Args: file (list[dict]): a list containing all the files to be inserted Structured: [{"name": "filename", "path": "path", "type": "filetype"}] app_id (int): the id of the apparat prof_id (str): the id of the professor """ for f in file: filename = f["name"] path = f["path"] filetyp = f["type"] if path == "Database": continue blob = create_blob(path) query = "INSERT OR IGNORE INTO files (filename, fileblob, app_id, filetyp,prof_id) VALUES (?, ?, ?, ?,?)" self.query_db(query, (filename, blob, app_id, filetyp, prof_id)) def recreateFile( self, filename: str, app_id: Union[str, int], filetype: str, ) -> str: """Recreate a file from the database Args: filename (str): the name of the file app_id (Union[str,int]): the id of the apparat filetype (str): the extension of the file to be created Returns: str: The filename of the recreated file """ blob = self.getBlob(filename, app_id) bloat = get_bloat_logger() try: bloat.debug("Recreated file blob size: {} bytes", len(blob)) bloat.debug("Recreated file blob (preview): {}", preview(blob, 2000)) except Exception: bloat.debug("Recreated file blob (preview): {}", preview(blob, 2000)) tempdir = settings.database.temp.expanduser() if not tempdir.exists(): tempdir.mkdir(parents=True, exist_ok=True) file = tempfile.NamedTemporaryFile( delete=False, dir=tempdir, mode="wb", suffix=f".{filetype}", ) file.write(blob) # log.debug("file created") return file.name def getFiles(self, app_id: Union[str, int], prof_id: int) -> list[tuple]: """Get all the files associated with the apparat and the professor Args: app_id (Union[str,int]): The id of the apparat prof_id (Union[str,int]): the id of the professor Returns: list[tuple]: a list of tuples containing the filename and the filetype for the corresponding apparat and professor """ return self.query_db( "SELECT filename, filetyp FROM files WHERE app_id=? AND prof_id=?", (app_id, prof_id), ) def getSemesters(self) -> list[str]: """Return all the unique semesters in the database Returns: list: a list of strings containing the semesters """ data = self.query_db("SELECT DISTINCT erstellsemester FROM semesterapparat") return [i[0] for i in data] def insertSubjects(self): # log.debug("Inserting subjects") subjects = [ "Biologie", "Chemie", "Deutsch", "Englisch", "Erziehungswissenschaft", "Französisch", "Geographie", "Geschichte", "Gesundheitspädagogik", "Haushalt / Textil", "Kunst", "Mathematik / Informatik", "Medien in der Bildung", "Musik", "Philosophie", "Physik", "Politikwissenschaft", "Prorektorat Lehre und Studium", "Psychologie", "Soziologie", "Sport", "Technik", "Theologie", "Wirtschaftslehre", ] conn = self.connect() cursor = conn.cursor() for subject in subjects: cursor.execute("INSERT INTO subjects (name) VALUES (?)", (subject,)) conn.commit() self.close_connection(conn) def getSubjects(self): """Get all the subjects in the database Returns: list[tuple]: a list of tuples containing the subjects """ return self.query_db("SELECT * FROM subjects") # Messages def addMessage( self, messages: list[dict[str, Any]], user: str, app_id: Union[str, int], ): """Add a Message to the database Args: messages (list[dict[str, Any]]): the messages to be added user (str): the user who added the messages app_id (Union[str,int]): the id of the apparat """ def __getUserId(user: str): return self.query_db( "SELECT id FROM user WHERE username=?", (user,), one=True, )[0] user_id = __getUserId(user) for message in messages: self.query_db( "INSERT INTO messages (message, user_id, remind_at,appnr) VALUES (?,?,?,?)", (message["message"], user_id, message["remind_at"], app_id), ) def getAllMessages(self) -> list[dict[str, str, str, str]]: """Get all the messages in the database Returns: list[dict[str, str, str, str]]: a list of dictionaries containing the message, the user who added the message, the apparat id and the id of the message """ def __get_user_name(user_id: int): return self.query_db( "SELECT username FROM user WHERE id=?", (user_id,), one=True, )[0] messages = self.query_db("SELECT * FROM messages") ret = [ { "message": i[2], "user": __get_user_name(i[4]), "appnr": i[5], "id": i[0], "remind_at": i[3], } for i in messages ] return ret def getMessages(self, date: str) -> list[dict[str, str]]: """Get all the messages for a specific date Args: date (str): a date.datetime object formatted as a string in the format "YYYY-MM-DD" Returns: list[dict[str, str]]: a list of dictionaries containing the message, the user who added the message, the apparat id and the id of the message """ def __get_user_name(user_id: int): return self.query_db( "SELECT username FROM user WHERE id=?", (user_id,), one=True, )[0] messages = self.query_db("SELECT * FROM messages WHERE remind_at=?", (date,)) ret = [ {"message": i[2], "user": __get_user_name(i[4]), "appnr": i[5], "id": i[0]} for i in messages ] return ret def deleteMessage(self, message_id: int): """Delete a message from the database Args: message_id (str): the id of the message """ log.debug(f"Deleting message with id {message_id}") self.query_db("DELETE FROM messages WHERE id=?", (message_id,)) # Prof data def getProfNameById(self, prof_id: Union[str, int], add_title: bool = False) -> str: """Get a professor name based on the id Args: prof_id (Union[str,int]): The id of the professor add_title (bool, optional): wether to add the title or no. Defaults to False. Returns: str: The name of the professor """ prof = self.query_db( "SELECT fullname FROM prof WHERE id=?", (prof_id,), one=True, ) if add_title: return f"{self.getTitleById(prof_id)}{prof[0]}" return prof[0] def getProfMailById(self, prof_id: Union[str, int]) -> str: """Get the mail of a professor based on the id Args: prof_id (Union[str,int]): the id of the professor Returns: str: the mail of the professor """ mail = self.query_db("SELECT mail FROM prof WHERE id=?", (prof_id,), one=True)[ 0 ] return mail if mail is not None else "" def getTitleById(self, prof_id: Union[str, int]) -> str: """Get the title of a professor based on the id Args: prof_id (Union[str,int]): the id of the professor Returns: str: the title of the professor, with an added whitespace at the end, if no title is present, an empty string is returned """ title = self.query_db( "SELECT titel FROM prof WHERE id=?", (prof_id,), one=True, )[0] return f"{title} " if title is not None else "" def getSpecificProfData( self, prof_id: Union[str, int], fields: List[str], ) -> tuple[Any, ...]: """A customisable function to get specific data of a professor based on the id Args: prof_id (Union[str,int]): the id of the professor fields (List[str]): a list of fields to be returned Returns: tuple: a tuple containing the requested data """ query = "SELECT " for field in fields: query += f"{field}," query = query[:-1] query += " FROM prof WHERE id=?" return self.query_db(query, (prof_id,), one=True)[0] def getProfById(self, prof_id: Union[str, int]) -> Prof: """Get a professor based on the id Args: prof_id (Union[str,int]): the id of the professor Returns: Prof: a Prof object containing the data of the professor """ data = self.query_db("SELECT * FROM prof WHERE id=?", (prof_id,), one=True) return Prof().from_tuple(data) def getProfData(self, profname: str): """Get mail, telephone number and title of a professor based on the name Args: profname (str): name of the professor Returns: tuple: the mail, telephone number and title of the professor """ data = self.query_db( "SELECT * FROM prof WHERE fullname=?", (profname.replace(",", ""),), one=True, ) person = Prof() return person.from_tuple(data) def getProf(self, id) -> Prof: """Get a professor based on the id Args: id ([type]): the id of the professor Returns: Prof: a Prof object containing the data of the professor """ data = self.query_db("SELECT * FROM prof WHERE id=?", (id,), one=True) return Prof().from_tuple(data) def getProfs(self) -> list[Prof]: """Return all the professors in the database Returns: list[tuple]: a list containing all the professors in individual tuples tuple: (id, titel, fname, lname, fullname, mail, telnr) """ profs = self.query_db("SELECT * FROM prof") return [Prof().from_tuple(prof) for prof in profs] # Apparat def getAllAparats(self, deleted: int = 0) -> list[Apparat]: """Get all the apparats in the database Args: deleted (int, optional): Switch the result to use . Defaults to 0. Returns: list[tuple]: a list of tuples containing the apparats """ apparats = self.query_db( "SELECT * FROM semesterapparat WHERE deletion_status=?", (deleted,), ) ret: list[Apparat] = [] for apparat in apparats: ret.append(Apparat().from_tuple(apparat)) return ret def getApparatData(self, appnr, appname) -> ApparatData: """Get the Apparat data based on the apparat number and the name Args: appnr (str): the apparat number appname (str): the name of the apparat Raises: NoResultError: an error is raised if no result is found Returns: ApparatData: the appended data of the apparat wrapped in an ApparatData object """ result = self.query_db( "SELECT * FROM semesterapparat WHERE appnr=? AND name=?", (appnr, appname), one=True, ) if result is None: raise NoResultError("No result found") apparat = ApparatData() apparat.apparat.id = result[0] apparat.apparat.name = result[1] apparat.apparat.appnr = result[4] apparat.apparat.eternal = True if result[7] == 1 else False apparat.prof = self.getProfData(self.getProfNameById(result[2])) apparat.prof.fullname = self.getProfNameById(result[2]) apparat.apparat.prof_id = result[2] apparat.apparat.subject = result[3] apparat.apparat.created_semester = result[5] apparat.apparat.extend_until = result[8] apparat.apparat.deleted = result[9] apparat.apparat.apparat_id_adis = result[11] apparat.apparat.prof_id_adis = result[12] apparat.apparat.konto = result[13] return apparat def getUnavailableApparatNumbers(self) -> List[int]: """Get a list of all the apparat numbers in the database that are currently in use Returns: List[int]: the list of used apparat numbers """ numbers = self.query_db( "SELECT appnr FROM semesterapparat WHERE deletion_status=0", ) numbers = [i[0] for i in numbers] numbers.sort() log.info(f"Currently used apparat numbers: {numbers}") return numbers def setNewSemesterDate(self, app_id: Union[str, int], newDate, dauerapp=False): """Set the new semester date for an apparat Args: app_id (Union[str,int]): the id of the apparat newDate (str): the new date dauerapp (bool, optional): if the apparat was changed to dauerapparat. Defaults to False. """ # today as yyyy-mm-dd today = datetime.datetime.now().strftime("%Y-%m-%d") if dauerapp: self.query_db( "UPDATE semesterapparat SET verlängerung_bis=?, dauer=?, verlängert_am=? WHERE appnr=?", (newDate, dauerapp, today, app_id), ) else: self.query_db( "UPDATE semesterapparat SET verlängerung_bis=?, verlängert_am=? WHERE appnr=?", (newDate, today, app_id), ) def getId(self, apparat_name) -> Optional[int]: """Get the id of an apparat based on the name Args: apparat_name (str): the name of the apparat e.g. "Semesterapparat 1" Returns: Optional[int]: the id of the apparat, if the apparat is not found, None is returned """ data = self.query_db( "SELECT id FROM semesterapparat WHERE name=?", (apparat_name,), one=True, ) if data is None: return None return data[0] def getApparatId(self, apparat_name) -> Optional[int]: """Get the id of an apparat based on the name Args: apparat_name (str): the name of the apparat e.g. "Semesterapparat 1" Returns: Optional[int]: the id of the apparat, if the apparat is not found, None is returned """ data = self.query_db( "SELECT appnr FROM semesterapparat WHERE name=?", (apparat_name,), one=True, ) if data is None: return None return data[0] def createApparat(self, apparat: ApparatData) -> int: """Create the apparat in the database Args: apparat (ApparatData): the wrapped metadata of the apparat Raises: AppPresentError: an error describing that the apparats chosen id is already present in the database Returns: Optional[int]: the id of the apparat """ log.debug("Creating apparat: {} - {}", app.appnr, app.name) app = apparat.apparat prof = apparat.prof present_prof = self.getProfByName(prof.name()) prof_id = present_prof.id log.debug("Present prof: {}", preview(present_prof, 300)) app_id = self.getApparatId(app.name) if app_id: return AppPresentError(app_id) if not prof_id: log.debug("prof id not present, creating prof with data: {}", preview(prof, 300)) prof_id = self.createProf(prof) log.debug("prof_id: {}", preview(prof_id, 50)) query = f"INSERT OR IGNORE INTO semesterapparat (appnr, name, erstellsemester, dauer, prof_id, fach,deletion_status,konto) VALUES ('{app.appnr}', '{app.name}', '{app.created_semester}', '{app.eternal}', {prof_id}, '{app.subject}', '{0}', '{SEMAP_MEDIA_ACCOUNTS[app.appnr]}')" log.debug("Apparat insert query: {}", preview(query, 500)) self.query_db(query) return None def getApparatsByProf(self, prof_id: Union[str, int]) -> list[Apparat]: """Get all apparats based on the professor id Args: prof_id (Union[str,int]): the id of the professor Returns: list[tuple]: a list of tuples containing the apparats """ data = self.query_db( "SELECT * FROM semesterapparat WHERE prof_id=?", (prof_id,), ) ret = [] for i in data: log.debug("Apparat row: {}", preview(i, 200)) ret.append(Apparat().from_tuple(i)) return ret def getApparatsBySemester(self, semester: str) -> dict[list]: """Get all apparats based on the semester Args: semester (str): the selected semester Returns: dict[list]: a list off all created and deleted apparats for the selected semester """ data = self.query_db( "SELECT name, prof_id FROM semesterapparat WHERE erstellsemester=?", (semester,), ) conn = self.connect() cursor = conn.cursor() c_tmp = [] for i in data: c_tmp.append((i[0], self.getProfNameById(i[1]))) query = ( f"SELECT name,prof_id FROM semesterapparat WHERE deleted_date='{semester}'" ) result = cursor.execute(query).fetchall() d_tmp = [] for i in result: d_tmp.append((i[0], self.getProfNameById(i[1]))) # group the apparats by prof c_ret = {} for i in c_tmp: if i[1] not in c_ret: c_ret[i[1]] = [i[0]] else: c_ret[i[1]].append(i[0]) d_ret = {} for i in d_tmp: if i[1] not in d_ret: d_ret[i[1]] = [i[0]] else: d_ret[i[1]].append(i[0]) self.close_connection(conn) return {"created": c_ret, "deleted": d_ret} def getApparatCountBySemester(self) -> tuple[list[str], list[int]]: """Get a list of all apparats created and deleted by semester Returns: tuple[list[str],list[int]]: a tuple containing two lists, the first list contains the semesters, the second list contains the amount of apparats created and deleted for the corresponding semester """ conn = self.connect() cursor = conn.cursor() semesters = self.getSemesters() created = [] deleted = [] for semester in semesters: query = f"SELECT COUNT(*) FROM semesterapparat WHERE erstellsemester='{semester}'" result = cursor.execute(query).fetchone() created.append(result[0]) query = f"SELECT COUNT(*) FROM semesterapparat WHERE deletion_status=1 AND deleted_date='{semester}'" result = cursor.execute(query).fetchone() deleted.append(result[0]) # store data in a tuple ret = [] for sem in semesters: e_tuple = ( sem, created[semesters.index(sem)], deleted[semesters.index(sem)], ) ret.append(e_tuple) self.close_connection(conn) return ret def deleteApparat(self, apparat: Apparat, semester: str): """Delete an apparat from the database Args: apparat: (Apparat): the apparat to be deleted semester (str): the semester the apparat should be deleted from """ apparat_nr = apparat.appnr app_id = self.getId(apparat.name) self.query_db( "UPDATE semesterapparat SET deletion_status=1, deleted_date=? WHERE appnr=? AND name=?", (semester, apparat_nr, apparat.name), ) # delete all books associated with the app_id # print(apparat_nr, app_id) self.query_db("UPDATE media SET deleted=1 WHERE app_id=?", (app_id,)) def isEternal(self, id): """Check if the apparat is eternal (dauerapparat) Args: id (int): the id of the apparat to be checked Returns: int: the state of the apparat """ return self.query_db( "SELECT dauer FROM semesterapparat WHERE appnr=?", (id,), one=True, ) def getApparatName(self, app_id: Union[str, int], prof_id: Union[str, int]): """Get the name of the apparat based on the id Args: app_id (Union[str,int]): the id of the apparat prof_id (Union[str,int]): the id of the professor Returns: str: the name of the apparat """ result = self.query_db( "SELECT name FROM semesterapparat WHERE appnr=? AND prof_id=?", (app_id, prof_id), one=True, ) if result: return result[0] return None def updateApparat(self, apparat_data: ApparatData): """Update an apparat in the database Args: apparat_data (ApparatData): the new metadata of the apparat """ query = "UPDATE semesterapparat SET name = ?, fach = ?, dauer = ?, prof_id = ?, prof_id_adis = ?, apparat_id_adis = ? WHERE appnr = ?" params = ( apparat_data.apparat.name, apparat_data.apparat.subject, apparat_data.apparat.eternal, self.getProfData(apparat_data.prof.fullname).id, apparat_data.apparat.prof_id_adis, apparat_data.apparat.apparat_id_adis, apparat_data.apparat.appnr, ) log.debug("Updating apparat: query: {} params: {}", preview(query, 200), preview(params, 300)) self.query_db(query, params) def checkApparatExists(self, app_name: str): """Check if the apparat is already present in the database based on the name Args: apparat_name (str): the name of the apparat Returns: bool: True if the apparat is present, False if not """ return ( True if self.query_db( "SELECT appnr FROM semesterapparat WHERE name=?", (app_name,), one=True, ) else False ) def checkApparatExistsByNr(self, app_nr: Union[str, int]) -> bool: """A check to see if the apparat is already present in the database, based on the nr. This query will exclude deleted apparats Args: app_nr (Union[str, int]): the id of the apparat Returns: bool: True if the apparat is present, False if not """ return ( True if self.query_db( "SELECT id FROM semesterapparat WHERE appnr=? and deletion_status=0", (app_nr,), one=True, ) else False ) # Statistics def statistic_request(self, **kwargs: Any): """Take n amount of kwargs and return the result of the query""" def __query(query): """Execute the query and return the result Args: query (str): the constructed query Returns: list: the result of the query """ log.debug(f"Query: {query}") conn = self.connect() cursor = conn.cursor() result = cursor.execute(query).fetchall() for result_a in result: orig_value = result_a prof_name = self.getProfNameById(result_a[2]) # replace the prof_id with the prof_name result_a = list(result_a) result_a[2] = prof_name result_a = tuple(result_a) result[result.index(orig_value)] = result_a self.close_connection(conn) log.debug(f"Query result: {result}") return result if "deletable" in kwargs: query = f"""SELECT * FROM semesterapparat WHERE deletion_status=0 AND dauer=0 AND ( (erstellsemester!='{kwargs["deletesemester"]}' AND verlängerung_bis IS NULL) OR (erstellsemester!='{kwargs["deletesemester"]}' AND verlängerung_bis!='{kwargs["deletesemester"]}' AND verlängerung_bis!='{Semester().next}') )""" return __query(query) if "dauer" in kwargs: kwargs["dauer"] = kwargs["dauer"].replace("Ja", "1").replace("Nein", "0") query = "SELECT * FROM semesterapparat WHERE " for key, value in kwargs.items() if kwargs.items() is not None else {}: # log.debug(key, value) query += f"{key}='{value}' AND " # log.debug(query) # remove deletesemester part from normal query, as this will be added to the database upon deleting the apparat if "deletesemester" in kwargs: query = query.replace( f"deletesemester='{kwargs['deletesemester']}' AND ", "", ) if "endsemester" in kwargs: if "erstellsemester" in kwargs: query = query.replace(f"endsemester='{kwargs['endsemester']}' AND ", "") query = query.replace( f"erstellsemester='{kwargs['erstellsemester']} AND ", "xyz", ) else: query = query.replace( f"endsemester='{kwargs['endsemester']}' AND ", "xyz", ) # log.debug("replaced") query = query.replace( "xyz", f"(erstellsemester='{kwargs['endsemester']}' OR verlängerung_bis='{kwargs['endsemester']}') AND ", ) # remove all x="" parts from the query where x is a key in kwargs log.info(f"Query before: {query}") query = query.strip() query = query[:-4] log.info(f"Query after: {query}") # check if query ends with lowercase letter or a '. if not, remove last symbol and try again while query[-1] not in ascii_lowercase and query[-1] != "'": query = query[:-1] query = query.strip() # log.debug(query) res = __query(query) # log.debug(res) return res # Admin data def getUser(self): """Get a single user from the database""" return self.query_db("SELECT * FROM user", one=True) def getUsers(self) -> list[tuple]: """Return a list of tuples of all the users in the database""" return self.query_db("SELECT * FROM user") def login(self, user, hashed_password): """Try to login the user. The salt for the user will be requested from the database and then added to the hashed password. The password will then be compared to the password in the database Args: user (str): username that tries to login hashed_password (str): the password the user tries to login with Returns: bool: True if the login was successful, False if not """ try: salt = self.query_db( "SELECT salt FROM user WHERE username=?", (user,), one=True, )[0] if salt is None: return False except TypeError: return False hashed_password = salt + hashed_password password = self.query_db( "SELECT password FROM user WHERE username=?", (user,), one=True, )[0] if password == hashed_password: return True return False def changePassword(self, user, new_password): """Change the password of a user. The password will be added with the salt and then committed to the database Args: user (str): username new_password (str): the hashed password """ salt = self.query_db( "SELECT salt FROM user WHERE username=?", (user,), one=True, )[0] new_password = salt + new_password self.query_db( "UPDATE user SET password=? WHERE username=?", (new_password, user), ) def getRole(self, user: str) -> str: """Get the role of the user Args: user (str): username Returns: str: the name of the role """ return self.query_db( "SELECT role FROM user WHERE username=?", (user,), one=True, )[0] def getRoles(self) -> list[tuple]: """Get all the roles in the database Returns: list[str]: a list of all the roles """ roles = self.query_db("SELECT role FROM user") return [i[0] for i in roles] def checkUsername(self, user) -> bool: """A check to see if the username is already present in the database Args: user (str): the username Returns: bool: True if the username is present, False if not """ data = self.query_db( "SELECT username FROM user WHERE username=?", (user,), one=True, ) return True if data is not None else False def createUser(self, user, password, role, salt): """Create an user from the AdminCommands class. Args: user (str): the username of the user password (str): a hashed password role (str): the role of the user salt (str): a salt for the password """ self.query_db( "INSERT OR IGNORE INTO user (username, password, role, salt) VALUES (?,?,?,?)", (user, password, role, salt), ) # check if user was created return ( self.query_db( "SELECT username FROM user WHERE username=?", (user,), one=True, ) is not None ) def deleteUser(self, user): """Delete an unser Args: user (str): username of the user """ self.query_db("DELETE FROM user WHERE username=?", (user,)) def updateUser(self, username, data: dict[str, str]): """Changge the data of a user Args: username (str): the username of the user data (dict[str, str]): the data to be changed """ conn = self.connect() cursor = conn.cursor() query = "UPDATE user SET " for key, value in data.items(): if key == "username": continue query += f"{key}='{value}'," query = query[:-1] query += " WHERE username=?" params = (username,) cursor.execute(query, params) conn.commit() self.close_connection(conn) def getFacultyMember(self, name: str) -> tuple: """Get a faculty member based on the name Args: name (str): the name to be searched for Returns: tuple: a tuple containing the data of the faculty member """ return self.query_db( "SELECT titel, fname,lname,mail,telnr,fullname FROM prof WHERE fullname=?", (name,), one=True, ) def updateFacultyMember(self, data: dict, oldlname: str, oldfname: str): """Update the data of a faculty member Args: data (dict): a dictionary containing the data to be updated oldlname (str): the old last name of the faculty member oldfname (str): the old first name of the faculty member """ placeholders = ", ".join([f"{i}=:{i} " for i in data]) query = f"UPDATE prof SET {placeholders} WHERE lname = :oldlname AND fname = :oldfname" data["oldlname"] = oldlname data["oldfname"] = oldfname self.query_db(query, data) def getFacultyMembers(self): """Get a list of all faculty members Returns: list[tuple]: a list of tuples containing the faculty members """ return self.query_db("SELECT titel, fname,lname,mail,telnr,fullname FROM prof") def restoreApparat(self, app_id: Union[str, int], app_name: str): """Restore an apparat from the database Args: app_id (Union[str, int]): the id of the apparat """ return self.query_db( "UPDATE semesterapparat SET deletion_status=0, deleted_date=NULL WHERE appnr=? and name=?", (app_id, app_name), ) # ELSA def createElsaApparat(self, date, prof_id, semester) -> int: """Create a new apparat in the database for the ELSA system Args: date (str): the name of the apparat prof_id (int): the id of the professor semester (str): the semester the apparat is created in Returns: int: the id of the apparat """ self.query_db( "INSERT OR IGNORE INTO elsa (date, prof_id, semester) VALUES (?,?,?)", (date, prof_id, semester), ) # get the id of the apparat apparat_id = self.query_db( "SELECT id FROM elsa WHERE date=? AND prof_id=? AND semester=?", (date, prof_id, semester), one=True, )[0] return apparat_id def updateElsaApparat(self, date, prof_id, semester, elsa_id): """Update an ELSA apparat in the database Args: date (str): the name of the apparat prof_id (int): the id of the professor semester (str): the semester the apparat is created in elsa_id (int): the id of the ELSA apparat """ self.query_db( "UPDATE elsa SET date=?, prof_id=?, semester=? WHERE id=?", (date, prof_id, semester, elsa_id), ) def addElsaMedia(self, data: dict, elsa_id: int): """Add a media to the ELSA system Args: data (dict): a dictionary containing the data of the media, elsa_id (int): the id of the ELSA apparat """ headers = [] entries = [] for key, value in data.items(): headers.append(key) entries.append(value) headers.append("elsa_id") entries.append(elsa_id) query = f"INSERT INTO elsa_media ({', '.join(headers)}) VALUES ({', '.join(['?' for i in range(len(headers))])})" self.query_db(query, entries) def getElsaMedia(self, elsa_id: int): """Get all the media of an ELSA apparat Args: elsa_id (int): the id of the ELSA apparat Returns: list[tuple]: a list of tuples containing the media """ media = self.query_db("SELECT * FROM elsa_media WHERE elsa_id=?", (elsa_id,)) # convert the media to a list of dictionaries ret = [] table_fields = self.query_db("PRAGMA table_info(elsa_media)") for m in media: tmp = {} for i in range(len(m)): tmp[table_fields[i][1]] = m[i] ret.append(tmp) return ret def insertElsaFile(self, file: list[dict], elsa_id: int): """Instert a list of files into the ELSA system Args: file (list[dict]): a list containing all the files to be inserted Structured: [{"name": "filename", "path": "path", "type": "filetype"}] elsa_id (int): the id of the ELSA apparat """ for f in file: filename = f["name"] path = f["path"] filetyp = f["type"] if path == "Database": continue blob = create_blob(path) query = "INSERT OR IGNORE INTO elsa_files (filename, fileblob, elsa_id, filetyp) VALUES (?, ?, ?, ?)" self.query_db(query, (filename, blob, elsa_id, filetyp)) def recreateElsaFile(self, filename: str, filetype: str) -> str: """Recreate a file from the ELSA system Args: filename (str): the name of the file elsa_id (int): the id of the ELSA apparat filetype (str): the extension of the file to be created Returns: str: The filename of the recreated file """ blob = self.query_db( "SELECT fileblob FROM elsa_files WHERE filename=?", (filename,), one=True, )[0] # log.debug(blob) tempdir = settings.database.temp.expanduser() if not tempdir.exists(): tempdir.mkdir(parents=True, exist_ok=True) file = tempfile.NamedTemporaryFile( delete=False, dir=tempdir, mode="wb", suffix=f".{filetype}", ) file.write(blob) # log.debug("file created") return file.name def getElsaApparats(self) -> ELSA: """Get all the ELSA apparats in the database Returns: list[tuple]: a list of tuples containing the ELSA apparats """ return self.query_db( "SELECT * FROM elsa ORDER BY substr(date, 7, 4) || '-' || substr(date, 4, 2) || '-' || substr(date, 1, 2)", ) def getElsaId(self, prof_id: int, semester: str, date: str) -> int: """Get the id of an ELSA apparat based on the professor, semester and date Args: prof_id (int): the id of the professor semester (str): the semester date (str): the date of the apparat Returns: int: the id of the ELSA apparat """ data = self.query_db( "SELECT id FROM elsa WHERE prof_id=? AND semester=? AND date=?", (prof_id, semester, date), one=True, ) if data is None: return None return data[0] def getElsaFiles(self, elsa_id: int): """Get all the files of an ELSA apparat Args: elsa_id (int): the id of the ELSA apparat Returns: list[tuple]: a list of tuples containing the files """ return self.query_db( "SELECT filename, filetyp FROM elsa_files WHERE elsa_id=?", (elsa_id,), ) ### def createProf(self, profdata: Prof): log.debug("Creating profdata: {}", preview(profdata, 500)) conn = self.connect() cursor = conn.cursor() fname = profdata.firstname lname = profdata.lastname fullname = f"{lname} {fname}" mail = profdata.mail telnr = profdata.telnr title = profdata.title query = "INSERT INTO prof (fname, lname, fullname, mail, telnr, titel) VALUES (?,?,?,?,?,?)" log.debug("DB query: {}", preview(query, 200)) cursor.execute(query, (fname, lname, fullname, mail, telnr, title)) conn.commit() conn.close() return self.getProfId(profdata) def getElsaProfId(self, profname): query = f"SELECT id FROM elsa_prof WHERE fullname = '{profname}'" data = self.query_db(query) if data: return data[0][0] return None def getElsaProfs(self) -> list[str]: query = "SELECT fullname FROM elsa_prof" data = self.query_db(query) if data: return [i[0] for i in data] return [] def getProfId(self, profdata: dict[str, Any] | Prof): """Get the prof ID based on the profdata Args: profdata (dict | Prof): either a dictionary containing the prof data or a Prof object Returns: int | None: The id of the prof or None if not found """ conn = self.connect() cursor = conn.cursor() if isinstance(profdata, dict): name = profdata["profname"] if "," in name: fname = name.split(", ")[1].strip() lname = name.split(", ")[0].strip() fullname = f"{lname} {fname}" else: fullname = profdata["profname"] else: fullname = profdata.name() query = "SELECT id FROM prof WHERE fullname = ?" log.debug("DB query: {}", preview(query, 200)) cursor.execute(query, (fullname,)) result = cursor.fetchone() if result: return result[0] return None def getProfByName(self, fullname): """Get all Data of the prof based on fullname Args: fullname (str): The full name of the prof """ conn = self.connect() cursor = conn.cursor() query = "SELECT * FROM prof WHERE fullname = ?" log.debug("DB query: {}", preview(query, 200)) result = cursor.execute(query, (fullname,)).fetchone() if result: return Prof().from_tuple(result) return Prof() def getProfIDByApparat(self, apprarat_id: int) -> Optional[int]: """Get the prof id based on the semesterapparat id from the database Args: apprarat_id (int): Number of the apparat Returns: int | None: The id of the prof or None if not found """ query = "SELECT prof_id from semesterapparat WHERE appnr = ? and deletion_status = 0" data = self.query_db(query, (apprarat_id,)) if data: log.info("Prof ID: " + str(data[0][0])) return data[0][0] return None def copyBookToApparat(self, book_id: int, apparat: int): # get book data new_apparat_id = apparat new_prof_id = self.getProfIDByApparat(new_apparat_id) query = ( "INSERT INTO media (bookdata, app_id, prof_id, deleted, available, reservation) " "SELECT bookdata, ?, ?, 0, available, reservation FROM media WHERE id = ?" ) connection = self.connect() cursor = connection.cursor() cursor.execute(query, (new_apparat_id, new_prof_id, book_id)) connection.commit() connection.close() def moveBookToApparat(self, book_id: int, appratat: int): """Move the book to the new apparat Args: book_id (int): the ID of the book appratat (int): the ID of the new apparat """ # get book data query = "UPDATE media SET app_id = ? WHERE id = ?" connection = self.connect() cursor = connection.cursor() cursor.execute(query, (appratat, book_id)) connection.commit() connection.close() def getApparatNameByAppNr(self, appnr: int): query = ( "SELECT name FROM semesterapparat WHERE appnr = ? and deletion_status = 0" ) data = self.query_db(query, (appnr,)) if data: return data[0][0] return None def fetch_one(self, query: str, args: tuple[Any, ...] = ()) -> tuple[Any, ...]: connection = self.connect() cursor = connection.cursor() cursor.execute(query, args) result = cursor.fetchone() connection.close() return result def getBookIdByPPN(self, ppn: str) -> int: query = "SELECT id FROM media WHERE bookdata LIKE ?" data = self.query_db(query, (f"%{ppn}%",)) if data: return data[0][0] return None def getNewEditionsByApparat(self, apparat_id: int) -> list[BookData]: """Get all new editions for a specific apparat Args: apparat_id (int): the id of the apparat Returns: list[tuple]: A list of tuples containing the new editions data """ query = "SELECT * FROM neweditions WHERE for_apparat=? AND ordered=0" results = self.query_db(query, (apparat_id,)) res = [] for result in results: # keep only new edition payload; old edition can be reconstructed if needed res.append(BookData().from_string(result[1])) return res def setOrdered(self, newBook_id: int): query = "UPDATE neweditions SET ordered=1 WHERE id=?" self.query_db(query, (newBook_id,)) def getBooksWithNewEditions(self, app_id) -> List[BookData]: # select all bookdata from media, based on the old_edition_id in neweditions where for_apparat = app_id; also get the new_edition bookdata query = "SELECT m.bookdata, new_bookdata FROM media m JOIN neweditions n ON m.id = n.old_edition_id WHERE n.for_apparat = ?" results = self.query_db(query, (app_id,)) # store results in tuple old,new res = [] for result in results: oldedition = BookData().from_string(result[0]) newedition = BookData().from_string(result[1]) res.append((oldedition, newedition)) return res def getNewEditionId(self, newBook: BookData): query = "SELECT id FROM neweditions WHERE new_bookdata LIKE ?" args = ( newBook.isbn[0] if newBook.isbn and len(newBook.isbn) > 0 else newBook.ppn ) params = (f"%{args}%",) data = self.query_db(query, params, one=True) if data: return data[0] return None def insertNewEdition(self, newBook: BookData, oldBookId: int, for_apparat: int): # check if new edition already in table, check based on newBook.ppn check_query = "SELECT id FROM neweditions WHERE new_bookdata LIKE ?" check_params = (f"%{newBook.ppn}%",) data = self.query_db(check_query, check_params, one=True) if data: log.info("New edition already in table, skipping insert") return query = "INSERT INTO neweditions (new_bookdata, old_edition_id, for_apparat) VALUES (?,?,?)" params = (newBook.to_dict, oldBookId, for_apparat) self.query_db(query, params)