import os import sqlite3 as sql import tempfile from pathlib import Path from src import settings from typing import Any, List, Optional, Tuple, Union import datetime from src.backend.db import ( CREATE_ELSA_FILES_TABLE, CREATE_ELSA_MEDIA_TABLE, CREATE_ELSA_TABLE, CREATE_TABLE_APPARAT, CREATE_TABLE_FILES, CREATE_TABLE_MEDIA, CREATE_TABLE_MESSAGES, CREATE_TABLE_PROF, CREATE_TABLE_SUBJECTS, CREATE_TABLE_USER, ) from src.errors import AppPresentError, NoResultError from src.logic import ApparatData, BookData, Prof, Apparat, ELSA from src.logic.constants import SEMAP_MEDIA_ACCOUNTS from src.utils import create_blob, dump_pickle, load_pickle from .semester import Semester from string import ascii_lowercase as lower, digits, punctuation import sys from loguru import logger as log logger = log logger.remove() logger.add("logs/application.log", rotation="1 week", enqueue=True) log.add( "logs/database.log", ) # logger.add(sys.stderr, format="{time} {level} {message}", level="INFO") logger.add(sys.stdout) ascii_lowercase = lower + digits + punctuation # get the line that called the function class Database: database = settings.database """ Initialize the database and create the tables if they do not exist. """ def __init__(self, db_path: str = None): """ Default constructor for the database class Args: db_path (str, optional): Optional Path for testing / specific purposes. Defaults to None. """ if db_path is None: self.db_path = self.database.path + self.database.name self.db_path = self.db_path.replace("~", str(Path.home())) logger.debug(self.db_path) else: self.db_path = db_path self.checkDatabaseStatus() def checkDatabaseStatus(self): path = self.database.path path = path.replace("~", str(Path.home())) path = os.path.abspath(path) if not os.path.exists(path): # create path # logger.debug(path) os.makedirs(path) if self.get_db_contents() == []: logger.critical("Database does not exist, creating tables") self.create_tables() self.insertSubjects() def getElsaMediaID(self, work_author, signature, pages): query = ( "SELECT id FROM elsa_media WHERE work_author=? AND signature=? AND pages=?" ) params = (work_author, signature, pages) result = self.query_db(query, params, one=True) if result is None: return NoResultError( f"work_author: {work_author}, signature: {signature}, pages: {pages}" ).__str__() return result[0] def getElsaMediaType(self, id): query = "SELECT type FROM elsa_media WHERE id=?" return self.query_db(query, (id,), one=True)[0] def get_db_contents(self) -> Union[List[Tuple], None]: """ Get the contents of the Returns: Union[List[Tuple], None]: _description_ """ try: with sql.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM sqlite_master WHERE type='table'") return cursor.fetchall() except sql.OperationalError: return None def connect(self) -> sql.Connection: """ Connect to the database Returns: sql.Connection: The active connection to the database """ return sql.connect(self.db_path) def close_connection(self, conn: sql.Connection): """ closes the connection to the database Args: ---- - conn (sql.Connection): the connection to be closed """ conn.close() def create_tables(self): """ Create the tables in the database """ conn = self.connect() cursor = conn.cursor() cursor.execute(CREATE_TABLE_APPARAT) cursor.execute(CREATE_TABLE_MESSAGES) cursor.execute(CREATE_TABLE_MEDIA) cursor.execute(CREATE_TABLE_FILES) cursor.execute(CREATE_TABLE_PROF) cursor.execute(CREATE_TABLE_USER) cursor.execute(CREATE_TABLE_SUBJECTS) cursor.execute(CREATE_ELSA_TABLE) cursor.execute(CREATE_ELSA_FILES_TABLE) cursor.execute(CREATE_ELSA_MEDIA_TABLE) conn.commit() self.close_connection(conn) def insertInto(self, query: str, params: Tuple) -> None: """ Insert sent data into the database Args: query (str): The query to be executed params (Tuple): the parameters to be inserted into the database """ conn = self.connect() cursor = conn.cursor() logger.debug(f"Inserting {params} into database with query {query}") cursor.execute(query, params) conn.commit() self.close_connection(conn) @logger.catch def query_db( self, query: str, args: Tuple[Any, Any] = (), # type:ignore one: bool = False, # type:ignore ) -> Union[Tuple[Any, Any], List[Tuple[Any, Any]]]: """ Query the Database for the sent query. Args: query (str): The query to be executed args (Tuple, optional): The arguments for the query. Defaults to (). one (bool, optional): Return the first result only. Defaults to False. Returns: Union[Tuple | List[Tuple]]: Returns the result of the query """ conn = self.connect() cursor = conn.cursor() logs_query = query logs_args = args if "fileblob" in query: # set fileblob arg in logger to "too long" logs_query = query fileblob_location = query.find("fileblob") # remove fileblob from query logs_query = query[:fileblob_location] + "fileblob = too long" log_message = f"Querying database with query {logs_query}, args: {logs_args}" # if "INSERT" in query: # log_message = f"Querying database with query {query}" if "INTO user" in query: log_message = f"Querying database with query {query}" # logger.debug(f"DB Query: {log_message}") try: cursor.execute(query, args) rv = cursor.fetchall() conn.commit() self.close_connection(conn) except sql.OperationalError as e: logger.error(f"Error in query: {e}") return None return (rv[0] if rv else None) if one else rv # Books def addBookToDatabase( self, bookdata: BookData, app_id: Union[str, int], prof_id: Union[str, int] ): """ Add books to the database. Both app_id and prof_id are required to add the book to the database, as the app_id and prof_id are used to select the books later on. Args: bookdata (BookData): The metadata of the book to be added app_id (str): The apparat id where the book should be added to prof_id (str): The id of the professor where the book should be added to. """ logger.info(f"Adding book {bookdata.signature} to database") if app_id is None or prof_id is None: raise ValueError("Apparate ID or Prof ID is None") conn = self.connect() cursor = conn.cursor() t_query = ( f"SELECT bookdata FROM media WHERE app_id={app_id} AND prof_id={prof_id}" ) logger.debug(t_query) # # logger.debug(t_query) result = cursor.execute(t_query).fetchall() result = [load_pickle(i[0]) for i in result] if bookdata in result: # logger.debug("Bookdata already in database") # check if the book was deleted in the apparat query = ( "SELECT deleted FROM media WHERE app_id=? AND prof_id=? AND bookdata=?" ) params = (app_id, prof_id, dump_pickle(bookdata)) result = cursor.execute(query, params).fetchone() if result[0] == 1: # logger.debug("Book was deleted, updating bookdata") query = "UPDATE media SET deleted=0 WHERE app_id=? AND prof_id=? AND bookdata=?" params = (app_id, prof_id, dump_pickle(bookdata)) cursor.execute(query, params) conn.commit() return query = ( "INSERT INTO media (bookdata, app_id, prof_id,deleted) VALUES (?, ?, ?,?)" ) converted = dump_pickle(bookdata) params = (converted, app_id, prof_id, 0) cursor.execute(query, params) logMessage = f"Added book with signature {bookdata.signature} to database, data: {converted}" logger.info(logMessage) conn.commit() self.close_connection(conn) def getBookIdBasedOnSignature( self, app_id: Union[str, int], prof_id: Union[str, int], signature: str ) -> int: """ Get a book id based on the signature of the book. Args: app_id (str): The apparat id the book should be associated with prof_id (str): The professor id the book should be associated with signature (str): The signature of the book Returns: int: The id of the book """ result = self.query_db( "SELECT bookdata, id FROM media WHERE app_id=? AND prof_id=?", (app_id, prof_id), ) books = [(load_pickle(i[0]), i[1]) for i in result] book = [i for i in books if i[0].signature == signature][0][1] return book def getBookBasedOnSignature( self, app_id: Union[str, int], prof_id: Union[str, int], signature: str ) -> BookData: """ Get the book based on the signature of the book. Args: app_id (str): The apparat id the book should be associated with prof_id (str): The professor id the book should be associated with signature (str): The signature of the book Returns: BookData: The total metadata of the book wrapped in a BookData object """ result = self.query_db( "SELECT bookdata FROM media WHERE app_id=? AND prof_id=?", (app_id, prof_id) ) books: list[BookData] = [load_pickle(i[0]) for i in result] book = [i for i in books if i.signature == signature][0] return book def getLastBookId(self) -> int: """ Get the last book id in the database Returns: int: ID of the last book in the database """ return self.query_db("SELECT id FROM media ORDER BY id DESC", one=True)[0] def searchBook(self, data: dict[str, str]) -> list[tuple[BookData, int]]: """ Search a book in the database based on the sent data. Args: data (dict[str, str]): A dictionary containing the data to be searched for. The dictionary can contain the following: - signature: The signature of the book - title: The title of the book Returns: list[tuple[BookData, int]]: A list of tuples containing the wrapped Metadata and the id of the book """ rdata = self.query_db("SELECT * FROM media WHERE deleted=0") # logger.debug(rdata, len(rdata)) mode = 0 if len(data) == 1: if "signature" in data.keys(): mode = 1 elif "title" in data.keys(): mode = 2 elif len(data) == 2: mode = 3 else: return None ret = [] for book in rdata: bookdata = load_pickle(book[1]) app_id = book[2] prof_id = book[3] if mode == 1: if data["signature"] in bookdata.signature: ret.append((bookdata, app_id, prof_id)) elif mode == 2: if data["title"] in bookdata.title: ret.append((bookdata, app_id, prof_id)) elif mode == 3: if ( data["signature"] in bookdata.signature and data["title"] in bookdata.title ): ret.append((bookdata, app_id, prof_id)) # logger.debug(ret) return ret def setAvailability(self, book_id: str, available: str): """ Set the availability of a book in the database Args: book_id (str): The id of the book available (str): The availability of the book """ self.query_db("UPDATE media SET available=? WHERE id=?", (available, book_id)) def getBookId( self, bookdata: BookData, app_id: Union[str, int], prof_id: Union[str, int] ) -> int: """ Get the id of a book based on the metadata of the book Args: bookdata (BookData): The wrapped metadata of the book app_id (str): The apparat id the book should be associated with prof_id (str): The professor id the book should be associated with Returns: int: ID of the book """ result = self.query_db( "SELECT id FROM media WHERE bookdata=? AND app_id=? AND prof_id=?", (dump_pickle(bookdata), app_id, prof_id), one=True, ) return result[0] def getBook(self, book_id: int) -> BookData: """ Get the book based on the id in the database Args: book_id (int): The id of the book Returns: BookData: The metadata of the book wrapped in a BookData object """ return load_pickle( self.query_db( "SELECT bookdata FROM media WHERE id=?", (book_id,), one=True )[0] ) def getBooks( self, app_id: Union[str, int], prof_id: Union[str, int], deleted=0 ) -> list[dict[str, Union[BookData, int]]]: """ Get the Books based on the apparat id and the professor id Args: app_id (str): The ID of the apparat prof_id (str): The ID of the professor deleted (int, optional): The state of the book. Set to 1 to include deleted ones. Defaults to 0. Returns: list[dict[int, BookData, int]]: A list of dictionaries containing the id, the metadata of the book and the availability of the book """ qdata = self.query_db( f"SELECT id,bookdata,available FROM media WHERE (app_id={app_id} AND prof_id={prof_id}) AND (deleted={deleted if deleted == 0 else '1 OR deleted=0'})" ) ret_result = [] if qdata is None: return [] for result_a in qdata: data: dict[str, Any] = {"id": int, "bookdata": BookData, "available": int} data["id"] = result_a[0] data["bookdata"] = load_pickle(result_a[1]) data["available"] = result_a[2] ret_result.append(data) return ret_result def updateBookdata(self, book_id: int, bookdata: BookData): """ Update the bookdata in the database Args: book_id (str): The id of the book bookdata (BookData): The new metadata of the book """ query = "UPDATE media SET bookdata= ? WHERE id=?" book = dump_pickle(bookdata) self.query_db(query, (book, book_id)) def deleteBook(self, book_id): """ Delete a book from the database Args: book_id (str): ID of the book """ self.query_db("UPDATE media SET deleted=1 WHERE id=?", (book_id,)) # File Interactions def getBlob(self, filename: str, app_id: Union[str, int]) -> bytes: """ Get a blob from the database Args: filename (str): The name of the file app_id (str): ID of the apparat Returns: bytes: The file stored in """ return self.query_db( "SELECT fileblob FROM files WHERE filename=? AND app_id=?", (filename, app_id), one=True, )[0] def insertFile( self, file: list[dict], app_id: Union[str, int], prof_id: Union[str, int] ): """Instert a list of files into the database Args: file (list[dict]): a list containing all the files to be inserted Structured: [{"name": "filename", "path": "path", "type": "filetype"}] app_id (int): the id of the apparat prof_id (str): the id of the professor """ for f in file: filename = f["name"] path = f["path"] filetyp = f["type"] if path == "Database": continue blob = create_blob(path) query = "INSERT OR IGNORE INTO files (filename, fileblob, app_id, filetyp,prof_id) VALUES (?, ?, ?, ?,?)" self.query_db(query, (filename, blob, app_id, filetyp, prof_id)) def recreateFile( self, filename: str, app_id: Union[str, int], filetype: str ) -> str: """Recreate a file from the database Args: filename (str): the name of the file app_id (Union[str,int]): the id of the apparat filetype (str): the extension of the file to be created Returns: str: The filename of the recreated file """ blob = self.getBlob(filename, app_id) tempdir = self.database.temp tempdir = tempdir.replace("~", str(Path.home())) tempdir_path = Path(tempdir) if not os.path.exists(tempdir_path): os.mkdir(tempdir_path) file = tempfile.NamedTemporaryFile( delete=False, dir=tempdir_path, mode="wb", suffix=f".{filetype}" ) file.write(blob) # logger.debug("file created") return file.name def getFiles(self, app_id: Union[str, int], prof_id: int) -> list[tuple]: """Get all the files associated with the apparat and the professor Args: app_id (Union[str,int]): The id of the apparat prof_id (Union[str,int]): the id of the professor Returns: list[tuple]: a list of tuples containing the filename and the filetype for the corresponding apparat and professor """ return self.query_db( "SELECT filename, filetyp FROM files WHERE app_id=? AND prof_id=?", (app_id, prof_id), ) def getSemesters(self) -> list[str]: """Return all the unique semesters in the database Returns: list: a list of strings containing the semesters """ data = self.query_db("SELECT DISTINCT erstellsemester FROM semesterapparat") return [i[0] for i in data] def insertSubjects(self): # logger.debug("Inserting subjects") subjects = [ "Biologie", "Chemie", "Deutsch", "Englisch", "Erziehungswissenschaft", "Französisch", "Geographie", "Geschichte", "Gesundheitspädagogik", "Haushalt / Textil", "Kunst", "Mathematik / Informatik", "Medien in der Bildung", "Musik", "Philosophie", "Physik", "Politikwissenschaft", "Prorektorat Lehre und Studium", "Psychologie", "Soziologie", "Sport", "Technik", "Theologie", "Wirtschaftslehre", ] conn = self.connect() cursor = conn.cursor() for subject in subjects: cursor.execute("INSERT INTO subjects (name) VALUES (?)", (subject,)) conn.commit() self.close_connection(conn) def getSubjects(self): """Get all the subjects in the database Returns: list[tuple]: a list of tuples containing the subjects """ return self.query_db("SELECT * FROM subjects") # Messages def addMessage(self, message: dict, user: str, app_id: Union[str, int]): """add a Message to the database Args: message (dict): the message to be added user (str): the user who added the message app_id (Union[str,int]): the id of the apparat """ def __getUserId(user): return self.query_db( "SELECT id FROM user WHERE username=?", (user,), one=True )[0] user_id = __getUserId(user) self.query_db( "INSERT INTO messages (message, user_id, remind_at,appnr) VALUES (?,?,?,?)", (message["message"], user_id, message["remind_at"], app_id), ) def getAllMessages(self) -> list[dict[str, str, str, str]]: """Get all the messages in the database Returns: list[dict[str, str, str, str]]: a list of dictionaries containing the message, the user who added the message, the apparat id and the id of the message """ def __get_user_name(user_id): return self.query_db( "SELECT username FROM user WHERE id=?", (user_id,), one=True )[0] messages = self.query_db("SELECT * FROM messages") ret = [ { "message": i[2], "user": __get_user_name(i[4]), "appnr": i[5], "id": i[0], "remind_at": i[3], } for i in messages ] return ret def getMessages(self, date: str) -> list[dict[str, str, str, str]]: """Get all the messages for a specific date Args: date (str): a date.datetime object formatted as a string in the format "YYYY-MM-DD" Returns: list[dict[str, str, str, str]]: a list of dictionaries containing the message, the user who added the message, the apparat id and the id of the message """ def __get_user_name(user_id): return self.query_db( "SELECT username FROM user WHERE id=?", (user_id,), one=True )[0] messages = self.query_db("SELECT * FROM messages WHERE remind_at=?", (date,)) ret = [ {"message": i[2], "user": __get_user_name(i[4]), "appnr": i[5], "id": i[0]} for i in messages ] return ret def deleteMessage(self, message_id): """Delete a message from the database Args: message_id (str): the id of the message """ logger.debug(f"Deleting message with id {message_id}") self.query_db("DELETE FROM messages WHERE id=?", (message_id,)) # Prof data def getProfNameById(self, prof_id: Union[str, int], add_title: bool = False) -> str: """Get a professor name based on the id Args: prof_id (Union[str,int]): The id of the professor add_title (bool, optional): wether to add the title or no. Defaults to False. Returns: str: The name of the professor """ prof = self.query_db( "SELECT fullname FROM prof WHERE id=?", (prof_id,), one=True ) if add_title: return f"{self.getTitleById(prof_id)}{prof[0]}" else: return prof[0] def getTitleById(self, prof_id: Union[str, int]) -> str: """get the title of a professor based on the id Args: prof_id (Union[str,int]): the id of the professor Returns: str: the title of the professor, with an added whitespace at the end, if no title is present, an empty string is returned """ title = self.query_db( "SELECT titel FROM prof WHERE id=?", (prof_id,), one=True )[0] return f"{title} " if title is not None else "" def getSpecificProfData(self, prof_id: Union[str, int], fields: List[str]) -> tuple: """A customisable function to get specific data of a professor based on the id Args: prof_id (Union[str,int]): the id of the professor fields (List[str]): a list of fields to be returned Returns: tuple: a tuple containing the requested data """ query = "SELECT " for field in fields: query += f"{field}," query = query[:-1] query += " FROM prof WHERE id=?" return self.query_db(query, (prof_id,), one=True)[0] def getProfById(self, prof_id: Union[str, int]) -> Prof: """Get a professor based on the id Args: prof_id (Union[str,int]): the id of the professor Returns: Prof: a Prof object containing the data of the professor """ data = self.query_db("SELECT * FROM prof WHERE id=?", (prof_id,), one=True) return Prof().from_tuple(data) def getProfData(self, profname: str): """Get mail, telephone number and title of a professor based on the name Args: profname (str): name of the professor Returns: tuple: the mail, telephone number and title of the professor """ data = self.query_db( "SELECT * FROM prof WHERE fullname=?", (profname.replace(",", ""),), one=True, ) person = Prof() return person.from_tuple(data) def getProf(self, id) -> Prof: """Get a professor based on the id Args: id ([type]): the id of the professor Returns: Prof: a Prof object containing the data of the professor """ data = self.query_db("SELECT * FROM prof WHERE id=?", (id,), one=True) return Prof().from_tuple(data) def getProfs(self) -> list[Prof]: """Return all the professors in the database Returns: list[tuple]: a list containing all the professors in individual tuples tuple: (id, titel, fname, lname, fullname, mail, telnr) """ profs = self.query_db("SELECT * FROM prof") return [Prof().from_tuple(prof) for prof in profs] # Apparat def getAllAparats(self, deleted=0) -> list[tuple]: """Get all the apparats in the database Args: deleted (int, optional): Switch the result to use . Defaults to 0. Returns: list[tuple]: a list of tuples containing the apparats """ return self.query_db( "SELECT * FROM semesterapparat WHERE deletion_status=?", (deleted,) ) def getApparatData(self, appnr, appname) -> ApparatData: """Get the Apparat data based on the apparat number and the name Args: appnr (str): the apparat number appname (str): the name of the apparat Raises: NoResultError: an error is raised if no result is found Returns: ApparatData: the appended data of the apparat wrapped in an ApparatData object """ result = self.query_db( "SELECT * FROM semesterapparat WHERE appnr=? AND name=?", (appnr, appname), one=True, ) if result is None: raise NoResultError("No result found") apparat = ApparatData() apparat.apparat.id = result[0] apparat.apparat.name = result[1] apparat.apparat.appnr = result[4] apparat.apparat.eternal = True if result[7] == 1 else False apparat.prof = self.getProfData(self.getProfNameById(result[2])) apparat.prof.fullname = self.getProfNameById(result[2]) apparat.apparat.prof_id = result[2] apparat.apparat.subject = result[3] apparat.apparat.created_semester = result[5] apparat.apparat.extend_until = result[8] apparat.apparat.deleted = result[9] apparat.apparat.apparat_id_adis = result[11] apparat.apparat.prof_id_adis = result[12] apparat.apparat.konto = result[13] return apparat def getUnavailableApparatNumbers(self) -> List[int]: """Get a list of all the apparat numbers in the database that are currently in use Returns: List[int]: the list of used apparat numbers """ numbers = self.query_db( "SELECT appnr FROM semesterapparat WHERE deletion_status=0" ) numbers = [i[0] for i in numbers] numbers.sort() logger.info(f"Currently used apparat numbers: {numbers}") return numbers def setNewSemesterDate(self, app_id: Union[str, int], newDate, dauerapp=False): """Set the new semester date for an apparat Args: app_id (Union[str,int]): the id of the apparat newDate (str): the new date dauerapp (bool, optional): if the apparat was changed to dauerapparat. Defaults to False. """ # today as yyyy-mm-dd today = datetime.datetime.now().strftime("%Y-%m-%d") if dauerapp: self.query_db( "UPDATE semesterapparat SET verlängerung_bis=?, dauer=?, verlängert_am=? WHERE appnr=?", (newDate, dauerapp, today, app_id), ) else: self.query_db( "UPDATE semesterapparat SET verlängerung_bis=?, verlängert_am=? WHERE appnr=?", (newDate, today, app_id), ) def getApparatId(self, apparat_name) -> Optional[int]: """get the id of an apparat based on the name Args: apparat_name (str): the name of the apparat e.g. "Semesterapparat 1" Returns: Optional[int]: the id of the apparat, if the apparat is not found, None is returned """ data = self.query_db( "SELECT appnr FROM semesterapparat WHERE name=?", (apparat_name,), one=True ) if data is None: return None else: return data[0] def createApparat(self, apparat: ApparatData) -> int: """create the apparat in the database Args: apparat (ApparatData): the wrapped metadata of the apparat Raises: AppPresentError: an error describing that the apparats chosen id is already present in the database Returns: Optional[int]: the id of the apparat """ logger.debug(apparat) app = apparat.apparat prof = apparat.prof present_prof = self.getProfByName(prof.name()) prof_id = present_prof.id logger.debug(present_prof) app_id = self.getApparatId(app.name) if app_id: return AppPresentError(app_id) if not prof_id: logger.debug("prof id not present, creating prof with data", prof) prof_id = self.createProf(prof) logger.debug(prof_id) query = f"INSERT OR IGNORE INTO semesterapparat (appnr, name, erstellsemester, dauer, prof_id, fach,deletion_status,konto) VALUES ('{app.appnr}', '{app.name}', '{app.created_semester}', '{app.eternal}', {prof_id}, '{app.subject}', '{0}', '{SEMAP_MEDIA_ACCOUNTS[app.appnr]}')" logger.debug(query) self.query_db(query) return None def getApparatsByProf(self, prof_id: Union[str, int]) -> list[Apparat]: """Get all apparats based on the professor id Args: prof_id (Union[str,int]): the id of the professor Returns: list[tuple]: a list of tuples containing the apparats """ data = self.query_db( "SELECT * FROM semesterapparat WHERE prof_id=?", (prof_id,) ) ret = [] for i in data: logger.debug(i) ret.append(Apparat().from_tuple(i)) return ret def getApparatsBySemester(self, semester: str) -> dict[list]: """get all apparats based on the semester Args: semester (str): the selected semester Returns: dict[list]: a list off all created and deleted apparats for the selected semester """ data = self.query_db( "SELECT name, prof_id FROM semesterapparat WHERE erstellsemester=?", (semester,), ) conn = self.connect() cursor = conn.cursor() c_tmp = [] for i in data: c_tmp.append((i[0], self.getProfNameById(i[1]))) query = ( f"SELECT name,prof_id FROM semesterapparat WHERE deleted_date='{semester}'" ) result = cursor.execute(query).fetchall() d_tmp = [] for i in result: d_tmp.append((i[0], self.getProfNameById(i[1]))) # group the apparats by prof c_ret = {} for i in c_tmp: if i[1] not in c_ret.keys(): c_ret[i[1]] = [i[0]] else: c_ret[i[1]].append(i[0]) d_ret = {} for i in d_tmp: if i[1] not in d_ret.keys(): d_ret[i[1]] = [i[0]] else: d_ret[i[1]].append(i[0]) self.close_connection(conn) return {"created": c_ret, "deleted": d_ret} def getApparatCountBySemester(self) -> tuple[list[str], list[int]]: """get a list of all apparats created and deleted by semester Returns: tuple[list[str],list[int]]: a tuple containing two lists, the first list contains the semesters, the second list contains the amount of apparats created and deleted for the corresponding semester """ conn = self.connect() cursor = conn.cursor() semesters = self.getSemesters() created = [] deleted = [] for semester in semesters: query = f"SELECT COUNT(*) FROM semesterapparat WHERE erstellsemester='{semester}'" result = cursor.execute(query).fetchone() created.append(result[0]) query = f"SELECT COUNT(*) FROM semesterapparat WHERE deletion_status=1 AND deleted_date='{semester}'" result = cursor.execute(query).fetchone() deleted.append(result[0]) # store data in a tuple ret = [] for sem in semesters: e_tuple = ( sem, created[semesters.index(sem)], deleted[semesters.index(sem)], ) ret.append(e_tuple) self.close_connection(conn) return ret def deleteApparat(self, app_id: Union[str, int], semester): """Delete an apparat from the database Args: app_id (Union[str, int]): the id of the apparat semester (str): the semester the apparat should be deleted from """ logger.info(f"Deleting apparat with id {app_id} in semester {semester}") self.query_db( "UPDATE semesterapparat SET deletion_status=1, deleted_date=? WHERE appnr=?", (semester, app_id), ) def isEternal(self, id): """check if the apparat is eternal (dauerapparat) Args: id (int): the id of the apparat to be checked Returns: int: the state of the apparat """ return self.query_db( "SELECT dauer FROM semesterapparat WHERE appnr=?", (id,), one=True ) def getApparatName(self, app_id: Union[str, int], prof_id: Union[str, int]): """get the name of the apparat based on the id Args: app_id (Union[str,int]): the id of the apparat prof_id (Union[str,int]): the id of the professor Returns: str: the name of the apparat """ return self.query_db( "SELECT name FROM semesterapparat WHERE appnr=? AND prof_id=?", (app_id, prof_id), one=True, )[0] def updateApparat(self, apparat_data: ApparatData): """Update an apparat in the database Args: apparat_data (ApparatData): the new metadata of the apparat """ query = "UPDATE semesterapparat SET name = ?, fach = ?, dauer = ?, prof_id = ?, prof_id_adis = ?, apparat_id_adis = ? WHERE appnr = ?" params = ( apparat_data.apparat.name, apparat_data.apparat.subject, apparat_data.apparat.eternal, self.getProfData(apparat_data.prof.fullname).id, apparat_data.apparat.prof_id_adis, apparat_data.apparat.apparat_id_adis, apparat_data.apparat.appnr, ) logger.debug(f"Updating apparat with query {query} and params {params}") self.query_db(query, params) def checkApparatExists(self, app_name: str): """check if the apparat is already present in the database based on the name Args: apparat_name (str): the name of the apparat Returns: bool: True if the apparat is present, False if not """ return ( True if self.query_db( "SELECT appnr FROM semesterapparat WHERE name=?", (app_name,), one=True, ) else False ) def checkApparatExistsById(self, app_id: Union[str, int]) -> bool: """a check to see if the apparat is already present in the database, based on the id Args: app_id (Union[str, int]): the id of the apparat Returns: bool: True if the apparat is present, False if not """ return ( True if self.query_db( "SELECT appnr FROM semesterapparat WHERE appnr=?", (app_id,), one=True ) else False ) # Statistics def statistic_request(self, **kwargs: Any): """Take n amount of kwargs and return the result of the query""" def __query(query): """execute the query and return the result Args: query (str): the constructed query Returns: list: the result of the query """ logger.debug(f"Query: {query}") conn = self.connect() cursor = conn.cursor() result = cursor.execute(query).fetchall() for result_a in result: orig_value = result_a prof_name = self.getProfNameById(result_a[2]) # replace the prof_id with the prof_name result_a = list(result_a) result_a[2] = prof_name result_a = tuple(result_a) result[result.index(orig_value)] = result_a self.close_connection(conn) logger.debug(f"Query result: {result}") return result if "deletable" in kwargs.keys(): query = f"""SELECT * FROM semesterapparat WHERE deletion_status=0 AND dauer=0 AND ( (erstellsemester!='{kwargs["deletesemester"]}' AND verlängerung_bis IS NULL) OR (erstellsemester!='{kwargs["deletesemester"]}' AND verlängerung_bis!='{kwargs["deletesemester"]}' AND verlängerung_bis!='{Semester().next}') )""" return __query(query) if "dauer" in kwargs.keys(): kwargs["dauer"] = kwargs["dauer"].replace("Ja", "1").replace("Nein", "0") query = "SELECT * FROM semesterapparat WHERE " for key, value in kwargs.items() if kwargs.items() is not None else {}: # logger.debug(key, value) query += f"{key}='{value}' AND " # logger.debug(query) # remove deletesemester part from normal query, as this will be added to the database upon deleting the apparat if "deletesemester" in kwargs.keys(): query = query.replace( f"deletesemester='{kwargs['deletesemester']}' AND ", "" ) if "endsemester" in kwargs.keys(): if "erstellsemester" in kwargs.keys(): query = query.replace(f"endsemester='{kwargs['endsemester']}' AND ", "") query = query.replace( f"erstellsemester='{kwargs['erstellsemester']} AND ", "xyz" ) else: query = query.replace( f"endsemester='{kwargs['endsemester']}' AND ", "xyz" ) # logger.debug("replaced") query = query.replace( "xyz", f"(erstellsemester='{kwargs['endsemester']}' OR verlängerung_bis='{kwargs['endsemester']}') AND ", ) # remove all x="" parts from the query where x is a key in kwargs logger.info(f"Query before: {query}") query = query.strip() query = query[:-4] logger.info(f"Query after: {query}") # check if query ends with lowercase letter or a '. if not, remove last symbol and try again while query[-1] not in ascii_lowercase and query[-1] != "'": query = query[:-1] query = query.strip() # logger.debug(query) res = __query(query) # logger.debug(res) return res # Admin data def getUser(self): """Get a single user from the database""" return self.query_db("SELECT * FROM user", one=True) def getUsers(self) -> list[tuple]: """Return a list of tuples of all the users in the database""" return self.query_db("SELECT * FROM user") def login(self, user, hashed_password): """try to login the user. The salt for the user will be requested from the database and then added to the hashed password. The password will then be compared to the password in the database Args: user (str): username that tries to login hashed_password (str): the password the user tries to login with Returns: bool: True if the login was successful, False if not """ salt = self.query_db( "SELECT salt FROM user WHERE username=?", (user,), one=True )[0] if salt is None: return False hashed_password = salt + hashed_password password = self.query_db( "SELECT password FROM user WHERE username=?", (user,), one=True )[0] if password == hashed_password: return True else: return False def changePassword(self, user, new_password): """change the password of a user. The password will be added with the salt and then committed to the database Args: user (str): username new_password (str): the hashed password """ salt = self.query_db( "SELECT salt FROM user WHERE username=?", (user,), one=True )[0] new_password = salt + new_password self.query_db( "UPDATE user SET password=? WHERE username=?", (new_password, user) ) def getRole(self, user: str) -> str: """get the role of the user Args: user (str): username Returns: str: the name of the role """ return self.query_db( "SELECT role FROM user WHERE username=?", (user,), one=True )[0] def getRoles(self) -> list[tuple]: """get all the roles in the database Returns: list[str]: a list of all the roles """ roles = self.query_db("SELECT role FROM user") return [i[0] for i in roles] def checkUsername(self, user) -> bool: """a check to see if the username is already present in the database Args: user (str): the username Returns: bool: True if the username is present, False if not """ data = self.query_db( "SELECT username FROM user WHERE username=?", (user,), one=True ) return True if data is not None else False def createUser(self, user, password, role, salt): """create an user from the AdminCommands class. Args: user (str): the username of the user password (str): a hashed password role (str): the role of the user salt (str): a salt for the password """ self.query_db( "INSERT OR IGNORE INTO user (username, password, role, salt) VALUES (?,?,?,?)", (user, password, role, salt), ) def deleteUser(self, user): """delete an unser Args: user (str): username of the user """ self.query_db("DELETE FROM user WHERE username=?", (user,)) def updateUser(self, username, data: dict[str, str]): """changge the data of a user Args: username (str): the username of the user data (dict[str, str]): the data to be changed """ conn = self.connect() cursor = conn.cursor() query = "UPDATE user SET " for key, value in data.items(): if key == "username": continue query += f"{key}='{value}'," query = query[:-1] query += " WHERE username=?" params = (username,) cursor.execute(query, params) conn.commit() self.close_connection(conn) def getFacultyMember(self, name: str) -> tuple: """get a faculty member based on the name Args: name (str): the name to be searched for Returns: tuple: a tuple containing the data of the faculty member """ return self.query_db( "SELECT titel, fname,lname,mail,telnr,fullname FROM prof WHERE fullname=?", (name,), one=True, ) def updateFacultyMember(self, data: dict, oldlname: str, oldfname: str): """update the data of a faculty member Args: data (dict): a dictionary containing the data to be updated oldlname (str): the old last name of the faculty member oldfname (str): the old first name of the faculty member """ placeholders = ", ".join([f"{i}=:{i} " for i in data.keys()]) query = f"UPDATE prof SET {placeholders} WHERE lname = :oldlname AND fname = :oldfname" data["oldlname"] = oldlname data["oldfname"] = oldfname self.query_db(query, data) def getFacultyMembers(self): """get a list of all faculty members Returns: list[tuple]: a list of tuples containing the faculty members """ return self.query_db("SELECT titel, fname,lname,mail,telnr,fullname FROM prof") def restoreApparat(self, app_id: Union[str, int], app_name: str): """restore an apparat from the database Args: app_id (Union[str, int]): the id of the apparat """ return self.query_db( "UPDATE semesterapparat SET deletion_status=0, deleted_date=NULL WHERE appnr=? and name=?", (app_id, app_name), ) # ELSA def createElsaApparat(self, date, prof_id, semester) -> int: """create a new apparat in the database for the ELSA system Args: date (str): the name of the apparat prof_id (int): the id of the professor semester (str): the semester the apparat is created in Returns: int: the id of the apparat """ self.query_db( "INSERT OR IGNORE INTO elsa (date, prof_id, semester) VALUES (?,?,?)", (date, prof_id, semester), ) # get the id of the apparat apparat_id = self.query_db( "SELECT id FROM elsa WHERE date=? AND prof_id=? AND semester=?", (date, prof_id, semester), one=True, )[0] return apparat_id def updateElsaApparat(self, date, prof_id, semester, elsa_id): """update an ELSA apparat in the database Args: date (str): the name of the apparat prof_id (int): the id of the professor semester (str): the semester the apparat is created in elsa_id (int): the id of the ELSA apparat """ self.query_db( "UPDATE elsa SET date=?, prof_id=?, semester=? WHERE id=?", (date, prof_id, semester, elsa_id), ) def addElsaMedia(self, data: dict, elsa_id: int): """add a media to the ELSA system Args: data (dict): a dictionary containing the data of the media, elsa_id (int): the id of the ELSA apparat """ headers = [] entries = [] for key, value in data.items(): headers.append(key) entries.append(value) headers.append("elsa_id") entries.append(elsa_id) query = f"INSERT INTO elsa_media ({', '.join(headers)}) VALUES ({', '.join(['?' for i in range(len(headers))])})" self.query_db(query, entries) def getElsaMedia(self, elsa_id: int): """get all the media of an ELSA apparat Args: elsa_id (int): the id of the ELSA apparat Returns: list[tuple]: a list of tuples containing the media """ media = self.query_db("SELECT * FROM elsa_media WHERE elsa_id=?", (elsa_id,)) # convert the media to a list of dictionaries ret = [] table_fields = self.query_db("PRAGMA table_info(elsa_media)") for m in media: tmp = {} for i in range(len(m)): tmp[table_fields[i][1]] = m[i] ret.append(tmp) return ret def insertElsaFile(self, file: list[dict], elsa_id: int): """Instert a list of files into the ELSA system Args: file (list[dict]): a list containing all the files to be inserted Structured: [{"name": "filename", "path": "path", "type": "filetype"}] elsa_id (int): the id of the ELSA apparat """ for f in file: filename = f["name"] path = f["path"] filetyp = f["type"] if path == "Database": continue blob = create_blob(path) query = "INSERT OR IGNORE INTO elsa_files (filename, fileblob, elsa_id, filetyp) VALUES (?, ?, ?, ?)" self.query_db(query, (filename, blob, elsa_id, filetyp)) def recreateElsaFile(self, filename: str, filetype: str) -> str: """Recreate a file from the ELSA system Args: filename (str): the name of the file elsa_id (int): the id of the ELSA apparat filetype (str): the extension of the file to be created Returns: str: The filename of the recreated file """ blob = self.query_db( "SELECT fileblob FROM elsa_files WHERE filename=?", (filename,), one=True )[0] # logger.debug(blob) tempdir = self.database.temp tempdir = tempdir.replace("~", str(Path.home())) tempdir_path = Path(tempdir) if not os.path.exists(tempdir_path): os.mkdir(tempdir_path) file = tempfile.NamedTemporaryFile( delete=False, dir=tempdir_path, mode="wb", suffix=f".{filetype}" ) file.write(blob) # logger.debug("file created") return file.name def getElsaApparats(self) -> ELSA: """Get all the ELSA apparats in the database Returns: list[tuple]: a list of tuples containing the ELSA apparats """ return self.query_db( "SELECT * FROM elsa ORDER BY substr(date, 7, 4) || '-' || substr(date, 4, 2) || '-' || substr(date, 1, 2)" ) def getElsaId(self, prof_id: int, semester: str, date: str) -> int: """get the id of an ELSA apparat based on the professor, semester and date Args: prof_id (int): the id of the professor semester (str): the semester date (str): the date of the apparat Returns: int: the id of the ELSA apparat """ data = self.query_db( "SELECT id FROM elsa WHERE prof_id=? AND semester=? AND date=?", (prof_id, semester, date), one=True, ) if data is None: return None return data[0] def getElsaFiles(self, elsa_id: int): """get all the files of an ELSA apparat Args: elsa_id (int): the id of the ELSA apparat Returns: list[tuple]: a list of tuples containing the files """ return self.query_db( "SELECT filename, filetyp FROM elsa_files WHERE elsa_id=?", (elsa_id,) ) ### def createProf(self, profdata: Prof): logger.debug(profdata) conn = self.connect() cursor = conn.cursor() fname = profdata.firstname lname = profdata.lastname fullname = f"{lname} {fname}" mail = profdata.mail telnr = profdata.telnr title = profdata.title query = f"INSERT INTO prof (fname, lname, fullname, mail, telnr,titel) VALUES ('{fname}','{lname}','{fullname}','{mail}','{telnr}','{title}')" logger.debug(query) cursor.execute(query) conn.commit() conn.close() return self.getProfId(profdata) def getElsaProfId(self, profname): query = f"SELECT id FROM elsa_prof WHERE fullname = '{profname}'" data = self.query_db(query) if data: return data[0][0] else: return None def getElsaProfs(self) -> list[str]: query = "SELECT fullname FROM elsa_prof" data = self.query_db(query) if data: return [i[0] for i in data] else: return [] def getProfId(self, profdata: dict[str, Any] | Prof): """Get the prof ID based on the profdata Args: profdata (dict | Prof): either a dictionary containing the prof data or a Prof object Returns: int | None: The id of the prof or None if not found """ conn = self.connect() cursor = conn.cursor() if isinstance(profdata, dict): name = profdata["profname"] if "," in name: fname = name.split(", ")[1].strip() lname = name.split(", ")[0].strip() fullname = f"{lname} {fname}" else: fullname = profdata["profname"] else: fullname = profdata.name() query = f"SELECT id FROM prof WHERE fullname = '{fullname}'" logger.debug(query) cursor.execute(query) result = cursor.fetchone() if result: return result[0] else: return None def getProfByName(self, fullname): """Get all Data of the prof based on fullname Args: fullname (str): The full name of the prof """ conn = self.connect() cursor = conn.cursor() query = f"SELECT * FROM prof WHERE fullname = '{fullname}'" logger.debug(query) result = cursor.execute(query).fetchone() if result: return Prof().from_tuple(result) else: return Prof() def getProfIDByApparat(self, apprarat_id): """Get the prof id based on the semesterapparat id from the database Args: apprarat_id (int): Number of the apparat Returns: int | None: The id of the prof or None if not found """ query = f"SELECT prof_id from semesterapparat WHERE appnr = '{apprarat_id}' and deletion_status = 0" data = self.query_db(query) if data: logger.info("Prof ID: " + str(data[0][0])) return data[0][0] else: return None def copyBookToApparat(self, book_id, apparat): # get book data new_apparat_id = apparat new_prof_id = self.getProfIDByApparat(new_apparat_id) query = f""" INSERT INTO media (bookdata, app_id, prof_id, deleted, available, reservation) SELECT bookdata, '{new_apparat_id}', '{new_prof_id}', 0, available, reservation FROM media where id = '{book_id}'""" connection = self.connect() cursor = connection.cursor() cursor.execute(query) connection.commit() connection.close() def moveBookToApparat(self, book_id, appratat): """Move the book to the new apparat Args: book_id (int): the ID of the book appratat (int): the ID of the new apparat """ # get book data query = f"UPDATE media SET app_id = '{appratat}' WHERE id = '{book_id}'" connection = self.connect() cursor = connection.cursor() cursor.execute(query) connection.commit() connection.close() def getApparatNameByAppNr(self, appnr): query = f"SELECT name FROM semesterapparat WHERE appnr = '{appnr}' and deletion_status = 0" data = self.query_db(query) if data: return data[0][0] else: return None