diff --git a/src/backend/admin_console.py b/src/backend/admin_console.py index 379103f..891f9b1 100644 --- a/src/backend/admin_console.py +++ b/src/backend/admin_console.py @@ -24,21 +24,21 @@ class AdminCommands: def create_admin(self): salt = self.create_salt() hashed_password = self.hash_password("admin") - self.db.create_user("admin", salt+hashed_password, "admin", salt) + self.db.createUser("admin", salt+hashed_password, "admin", salt) def hash_password(self, password): hashed = hashlib.sha256((password).encode("utf-8")).hexdigest() return hashed def list_users(self): - return self.db.get_users() + return self.db.getUsers() def delete_user(self, username): - self.db.delete_user(username) + self.db.deleteUser(username) def change_password(self, username, password): hashed_password = self.hash_password(password) - self.db.change_password(username, hashed_password) + self.db.changePassword(username, hashed_password) if __name__ == "__main__": diff --git a/src/backend/database.py b/src/backend/database.py index 7266201..56c2dc0 100644 --- a/src/backend/database.py +++ b/src/backend/database.py @@ -1,268 +1,90 @@ import datetime import os import re -import shutil -import sqlite3 as sql3 +import sqlite3 as sql import tempfile -from typing import Any - -from omegaconf import OmegaConf - - -# from src.data import pickles import pickle -from src.logic.constants import SEMAP_MEDIA_ACCOUNTS -from src.logic.dataclass import ApparatData, BookData from src.logic.log import MyLogger from icecream import ic +from typing import List, Tuple, Dict, Any, Optional, Union +from omegaconf import OmegaConf +from src.backend.db import CREATE_TABLE_APPARAT, CREATE_TABLE_MESSAGES, CREATE_TABLE_MEDIA, CREATE_TABLE_APPKONTOS, CREATE_TABLE_FILES, CREATE_TABLE_PROF, CREATE_TABLE_USER, CREATE_TABLE_SUBJECTS +from src.logic.constants import SEMAP_MEDIA_ACCOUNTS +from src.logic.dataclass import ApparatData, BookData +from src.errors import NoResultError, AppPresentError +from src.utils import load_pickle, dump_pickle,create_blob config = OmegaConf.load("config.yaml") - -logger = MyLogger("Database") +logger = MyLogger(__name__) class Database: - logger.log_info("Database imported") - - def __init__(self) -> None: - # TODO: change path later on to a variable based on the settings - self.database_path = f"{config.database.path}{config.database.name}" - ic(self.database_path) - # self.database_path = "sap.db" - logger.log_info("Connecting to database") - self.database = sql3.connect(self.database_path) - self.cur = self.database.cursor() - - pass - def create_database(self): - #create database from template - subjects = config.subjects - for subject in subjects: - self.cur.execute(f"INSERT INTO subjects (name) VALUES ('{subject}')") - self.database.commit() - - def create_blob(self, file): - with open(file, "rb") as f: - blob = f.read() - return blob - - def recreate_file(self, filename, app_id: int): - blob = self.get_blob(filename, app_id) - # write the blob to the file and save it to a preset destination - # with open(filename, "wb") as f: - # f.write(blob) - #use tempfile to create a temporary file - if not os.path.exists(config.database.tempdir): - os.mkdir(config.database.tempdir) - tempfile.NamedTemporaryFile(filename=filename,delete=False,dir=config.database.tempdir,mode="wb").write(blob) - - # user = os.getlogin() - # home = os.path.expanduser("~") - - # # check if the folder exists, if not, create it - # if not os.path.exists(f"{home}/Desktop/SemApp/{user}"): - # os.mkdir(f"{home}/Desktop/SemApp/{user}") - # shutil.move(filename, f"{home}/Desktop/SemApp/{user}") - - def get_blob(self, filename: str, app_id: int): - query = f"SELECT fileblob FROM files WHERE filename='{filename}' AND app_id={app_id}" - logger.log_info(f"Retrieving blob for {filename}, Appid {app_id} from database") - result = self.cur.execute(query).fetchone() - return result[0] - - def get_kto_no(self, app_id: int): - query = f"SELECT konto FROM semesterapparat WHERE id={app_id}" - result = self.cur.execute(query).fetchone() - return result[0] - - def insert_file(self, file: list[dict], app_id: int, prof_id): - for f in file: - filename = f["name"] - path = f["path"] - filetyp = f["type"] - print(f"filename: {filename}, path: {path}, filetyp: {filetyp}") - if path == "Database": - continue - blob = self.create_blob(path) - - query = "INSERT OR IGNORE INTO files (filename, fileblob, app_id, filetyp,prof_id) VALUES (?, ?, ?, ?,?)" - params = (filename, blob, app_id, filetyp, prof_id) - self.cur.execute(query, params) - logger.log_info( - f"Inserted {len(file)} file(s) of Apparat {app_id} into database" - ) - self.database.commit() - - def get_files(self, app_id: int, prof_id: int): - query = f"SELECT filename, filetyp FROM files WHERE app_id={app_id} AND prof_id={prof_id}" - result: list[tuple] = self.cur.execute(query).fetchall() - return result - - def get_prof_name_by_id(self, id, add_title: bool = False): - if add_title is True: - query = f"SELECT titel, fname, lname FROM prof WHERE id={id}" - result = self.cur.execute(query).fetchone() - name = " ".join(result[0:3]) - return name + def __init__(self, db_path: str = None): + if db_path is None: + self.db_path = config.database.path + config.database.name else: - query = f"SELECT fullname FROM prof WHERE id={id}" - print(query) - result = self.cur.execute(query).fetchone() - name = result[0] - return name + self.db_path = db_path + if self.get_db_contents() is None: + logger.log_critical("Database does not exist, creating tables") + self.create_tables() - def get_prof_id(self, profname: str): - query = f"SELECT id FROM prof WHERE fullname='{profname.replace(',', '')}'" - result = self.cur.execute(query).fetchone() - if result is None: - return None - return self.cur.execute(query).fetchone()[0] - - def get_app_id(self, appname: str): - query = f"SELECT id FROM semesterapparat WHERE name='{appname}'" - result = self.cur.execute(query).fetchone() - if result is None: - return None - return self.cur.execute(query).fetchone()[0] - - def get_profs(self): - query = "select * from prof" - return self.cur.execute(query).fetchall() - - def app_exists(self, appnr: str) -> bool: - query = f"SELECT appnr FROM semesterapparat WHERE appnr='{appnr}'" - result = self.cur.execute(query).fetchone() - if result is None: - return False - return True - - def get_prof_data(self, profname: str = None, id: int = None) -> dict[str, str]: - if profname is not None: - profname = profname.replace(",", "") - profname = re.sub(r"\s+", " ", profname).strip() - if id: - query = "Select prof_id FROM semesterapparat WHERE appnr=?" - params = (id,) - result = self.cur.execute(query, params).fetchone() - id = result[0] - query = ( - f"SELECT * FROM prof WHERE fullname='{profname}'" - if id is None - else f"SELECT * FROM prof WHERE id={id}" - ) - result = self.cur.execute(query).fetchone() - return_data = { - "prof_title": result[1], - "profname": f"{result[3], result[2]}", - "prof_mail": result[5], - "prof_tel": result[6], - "id": result[0], - } - print(return_data) - # select the entry that contains the first name - return return_data - - def set_new_sem_date(self, appnr, new_sem_date, dauerapp=False): - # Todo: use extend_semester in new release - date = datetime.datetime.now().strftime("%Y-%m-%d") - - query = f"UPDATE semesterapparat SET verlängert_am='{date}', verlängerung_bis='{new_sem_date}' WHERE appnr='{appnr}'" - if dauerapp is not False: - query = f"UPDATE semesterapparat SET verlängert_am='{date}', verlängerung_bis='{new_sem_date}', dauerapp='{dauerapp} WHERE appnr='{appnr}'" - self.cur.execute(query) - - self.database.commit() - - def create_apparat(self, ApparatData: ApparatData): - prof_id = self.get_prof_id(ApparatData.profname) - app_id = self.get_app_id(ApparatData.appname) - if app_id is None: - if prof_id is None: - self.create_prof(ApparatData.get_prof_details()) - prof_id = self.get_prof_id(ApparatData.profname) - - query = f"INSERT OR IGNORE INTO semesterapparat (appnr, name, erstellsemester, dauer, prof_id, fach,deletion_status,konto) VALUES ('{ApparatData.appnr}', '{ApparatData.appname}', '{ApparatData.semester}', '{ApparatData.dauerapp}', {prof_id}, '{ApparatData.app_fach}', '{ApparatData.deleted}', '{SEMAP_MEDIA_ACCOUNTS[ApparatData.appnr]}')" - print(query) - self.cur.execute(query) - self.database.commit() - logger.log_info(f"Created new apparat {ApparatData.appname}") - app_id = self.get_app_id(ApparatData.appname) - # if ApparatData.media_list is not None: #! Deprecated - # for media in ApparatData.media_list: - # self.insert_file(media, app_id) - # self.database.commit() - return app_id - - def create_prof(self, prof_details: dict): - prof_title = prof_details["prof_title"] - prof_fname = prof_details["profname"].split(",")[1] - prof_fname = prof_fname.strip() - prof_lname = prof_details["profname"].split(",")[0] - prof_lname = prof_lname.strip() - prof_fullname = prof_details["profname"].replace(",", "") - prof_mail = prof_details["prof_mail"] - prof_tel = prof_details["prof_tel"] - - query = f'INSERT OR IGNORE INTO prof (titel, fname, lname, fullname, mail, telnr) VALUES ("{prof_title}", "{prof_fname}", "{prof_lname}", "{prof_fullname}", "{prof_mail}", "{prof_tel}")' - self.cur.execute(query) - self.database.commit() - pass - - def get_apparat_nrs(self) -> list: + def get_db_contents(self): try: - self.cur.execute( - "SELECT appnr FROM semesterapparat Where deletion_status=0" - ) - except sql3.OperationalError: - return [] - return [i[0] for i in self.cur.fetchall()] + with sql.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT * FROM sqlite_master WHERE type='table'") + return cursor.fetchall() + except sql.OperationalError: + return None - def get_all_apparts(self, deleted=0): + def connect(self): + logger.log_info("Connecting to database") + return sql.connect(self.db_path) - self.cur.execute( - f"SELECT * FROM semesterapparat WHERE deletion_status={deleted}" - ) - return self.cur.fetchall() + def close_connection(self, conn: sql.Connection): + logger.log_info("Closing database connection") + conn.close() - # - def get_app_data(self, appnr, appname) -> ApparatData: - result = self.cur.execute( - f"SELECT * FROM semesterapparat WHERE appnr='{appnr}' AND name='{appname}'" - ).fetchone() - print(f"result: {result}") - # app_id=result[0] - data = ApparatData() - data.appnr = appnr - data.app_fach = result[3] - data.appname = result[1] - profname = self.get_prof_name_by_id(result[2]) - # set profname to lastname, firstname - profname = f"{profname.split(' ')[0]}, {profname.split(' ')[1]}" - data.profname = profname - prof_data = self.get_prof_data(data.profname) - data.prof_mail = prof_data["prof_mail"] - data.prof_tel = prof_data["prof_tel"] - data.prof_title = prof_data["prof_title"] - data.erstellsemester = result[5] - data.semester = result[8] - data.deleted = result[9] - data.apparat_adis_id = result[12] - data.prof_adis_id = None + def create_tables(self): + conn = self.connect() + cursor = conn.cursor() + cursor.execute(CREATE_TABLE_APPARAT) + cursor.execute(CREATE_TABLE_MESSAGES) + cursor.execute(CREATE_TABLE_MEDIA) + cursor.execute(CREATE_TABLE_APPKONTOS) + cursor.execute(CREATE_TABLE_FILES) + cursor.execute(CREATE_TABLE_PROF) + cursor.execute(CREATE_TABLE_USER) + cursor.execute(CREATE_TABLE_SUBJECTS) + conn.commit() + self.close_connection(conn) - print(data) - # data.media_list=self.get_media(app_id) + def insertInto(self, query, params): + conn = self.connect() + cursor = conn.cursor() + logger.log_info(f"Inserting {params} into database with query {query}") + cursor.execute(query, params) + conn.commit() + self.close_connection(conn) - return data + def query_db(self, query: str, args: Tuple = (), one: bool = False): + conn = self.connect() + cursor = conn.cursor() + logger.log_info(f"Querying database with query {query}, args: {args}") + cursor.execute(query, args) + rv = cursor.fetchall() + conn.commit() + self.close_connection(conn) + return (rv[0] if rv else None) if one else rv - def add_medium(self, bookdata: BookData, app_id: str, prof_id: str, *args): - # insert the bookdata into the media table - # try to retrieve the bookdata from the media table, check if the to be inserted bookdata is already in the table for the corresponding apparat - # if yes, do not insert the bookdata - # if no, insert the bookdata + # Books + def addBookToDatabase(self, bookdata:BookData,app_id:str, prof_id=str): + conn = self.connect() + cursor = conn.cursor() t_query = ( f"SELECT bookdata FROM media WHERE app_id={app_id} AND prof_id={prof_id}" ) # print(t_query) - result = self.cur.execute(t_query).fetchall() + result = cursor.execute(t_query).fetchall() result = [pickle.loads(i[0]) for i in result] if bookdata in result: print("Bookdata already in database") @@ -276,8 +98,8 @@ class Database: print("Book was deleted, updating bookdata") query = "UPDATE media SET deleted=0 WHERE app_id=? AND prof_id=? AND bookdata=?" params = (app_id, prof_id, pickle.dumps(bookdata)) - self.cur.execute(query, params) - self.database.commit() + cursor.execute(query, params) + conn.commit() return query = ( @@ -285,90 +107,326 @@ class Database: ) converted = pickle.dumps(bookdata) params = (converted, app_id, prof_id, 0) - self.cur.execute(query, params) - self.database.commit() + cursor.execute(query, params) + conn.commit() + self.close_connection(conn) + def getBookIdBasedOnSignature(self, app_id:str, prof_id:str,signature:str)->int: + result = self.query_db("SELECT bookdata, id FROM media WHERE app_id=? AND prof_id=?", (app_id,prof_id)) + books = [(load_pickle(i[0]),i[1]) for i in result] + book = [i for i in books if i[0].signature == signature][1] + return book + def getLastBookId(self)->int: + return self.query_db("SELECT id FROM media ORDER BY id DESC", one=True)[0] + def searchBook(self, data:dict[str, str])->list[tuple[BookData, int]]: + data = self.query_db("SELECT * FROM media WHERE deleted=0") + mode = 0 + if len(data)== 1: + if "signature" in data.keys(): + mode = 1 + elif "title" in data.keys(): + mode = 2 + elif len(data) == 2: + mode = 3 + else: + return None + ret = [] + for book in data: + bookdata = load_pickle(book[1]) + app_id = book[2] + prof_id = book[3] + if mode == 1: + if data["signature"] in bookdata.signature: + ret.append((bookdata,app_id,prof_id)) + elif mode == 2: + if data["title"] in bookdata.title: + ret.append((bookdata,app_id,prof_id)) + elif mode == 3: + if data["signature"] in bookdata.signature and data["title"] in bookdata.title: + ret.append((bookdata,app_id,prof_id)) + return ret + def setAvailability(self, book_id, available): + self.query_db("UPDATE media SET available=? WHERE id=?", (available,book_id)) + def getBookId(self, bookdata:BookData, app_id, prof_id): + result = self.query_db("SELECT id FROM media WHERE bookdata=? AND app_id=? AND prof_id=?", (dump_pickle(bookdata),app_id,prof_id), one=True) + return result[0] + def getBook(self,book_id): + return load_pickle(self.query_db("SELECT bookdata FROM media WHERE id=?", (book_id,), one=True)[0]) + def getBooks(self, app_id, prof_id, deleted=0): + """request media from database and return result as list. - def request_medium(self, app_id, prof_id, signature) -> int: - query = "SELECT bookdata, id FROM media WHERE app_id=? AND prof_id=?" - params = (app_id, prof_id) - result = self.cur.execute(query, params).fetchall() - books = [(i[1], pickle.loads(i[0])) for i in result] - print(books) - book = [i[0] for i in books if i[1].signature == signature] - return book[0] + Args: + ---- + - app_id (int): ID of the apparat + - prof_id (int): ID of the prof + - del_state (int, optional): If deleted books should be requested as well. 1 = yes 0 = no. Defaults to 0. - def add_message(self, message: dict, user, appnr): - def __get_user_id(user): - query = "SELECT id FROM user WHERE username=?" - params = (user,) - result = self.cur.execute(query, params).fetchone() - return result[0] + Returns: + ------- + - list[dict[int,BookData,int]]: Returns a list of dictionaries containing the bookdata, the id and the availability of the book in the following format: + ------- + {"id": int, + "bookdata": BookData, + "available": int} + """ + qdata = self.query_db(f"SELECT id,bookdata,available FROM media WHERE (app_id={app_id} AND prof_id={prof_id}) AND (deleted={deleted if deleted == 0 else '1 OR deleted=0'})") + ret_result = [] + for result_a in qdata: + ic(result_a) + data = {"id": int, "bookdata": BookData, "available": int} + data["id"] = result_a[0] + data["bookdata"] = pickle.loads(result_a[1]) + data["available"] = result_a[2] + ret_result.append(data) + return ret_result - user_id = __get_user_id(user) - query = "INSERT INTO messages (message, user_id, remind_at) VALUES (?, ?, ?)" - params = (message["message"], user_id, message["remind_at"]) - self.cur.execute(query, params) - self.database.commit() + def updateBookdata(self, book_id, bookdata:BookData): + self.query_db("UPDATE media SET bookdata=? WHERE id=?", (dump_pickle(bookdata),book_id)) + def deleteBook(self, book_id): + self.query_db("UPDATE media SET deleted=1 WHERE id=?", (book_id,)) - def get_messages(self, date: str): - def __get_user_name(id): - query = "SELECT username FROM user WHERE id=?" - params = (id,) - result = self.cur.execute(query, params).fetchone() - return result[0] + # File Interactions + def getBlob(self, filename, app_id): + return self.query_db("SELECT fileblob FROM files WHERE filename=? AND app_id=?", (filename,app_id), one=True)[0] + def insertFile(self, file: list[dict], app_id: int, prof_id): + for f in file: + filename = f["name"] + path = f["path"] + filetyp = f["type"] + if path == "Database": + continue + blob = create_blob(path) + query = "INSERT OR IGNORE INTO files (filename, fileblob, app_id, filetyp,prof_id) VALUES (?, ?, ?, ?,?)" + self.query_db(query, (filename, blob, app_id, filetyp,prof_id)) + def recreateFile(self, filename, add_id): + blob = self.getBlob(filename, add_id) + if not os.path.exists(config.database.tempdir): + os.mkdir(config.database.tempdir) + tempfile.NamedTemporaryFile(filename=filename,delete=False,dir=config.database.tempdir,mode="wb").write(blob) - query = f"SELECT * FROM messages WHERE remind_at='{date}'" - result = self.cur.execute(query).fetchall() + def getFiles(self, app_id:int, prof_id:int)->list[tuple]: + return self.query_db("SELECT filename, filetyp FROM files WHERE app_id=? AND prof_id=?", (app_id,prof_id)) + + def getSemersters(self): + data = self.query_db("SELECT DISTINCT erstellsemester FROM semesterapparat") + return [i[0] for i in data] + + def getSubjects(self): + return self.query_db("SELECT * FROM subjects") + + # Messages + def addMessage(self, message:dict,user, appnr): + def __getUserId(user): + return self.query_db("SELECT id FROM user WHERE username=?", (user,), one=True)[0] + user_id = __getUserId(user) + self.query_db("INSERT INTO messages (message, user_id, remind_at,appnr) VALUES (?,?,?,?)", (message["message"],user_id,message["remind_at"],appnr)) + def getMessages(self, date:str): + def __get_user_name(user_id): + return self.query_db("SELECT username FROM user WHERE id=?", (user_id,), one=True)[0] + messages = self.query_db("SELECT * FROM messages WHERE remind_at=?", (date,)) ret = [ { "message": i[2], "user": __get_user_name(i[4]), - "apparatnr": i[5], - "id": i[0], + "appnr": i[5], + "id": i[0] } - for i in result + for i in messages ] return ret + def deleteMessage(self, message_id): + self.query_db("DELETE FROM messages WHERE id=?", (message_id,)) - def get_apparat_id(self, appname): - query = f"SELECT appnr FROM semesterapparat WHERE name='{appname}'" - result = self.cur.execute(query).fetchone() - return result[0] - - - def get_apparats_by_semester(self, semester: str): - query = f"SELECT * FROM semesterapparat WHERE erstellsemester='{semester}'" - result = self.cur.execute(query).fetchall() - return result + # Prof data + def getProfNameById(self, prof_id:int,add_title:bool=False): + prof = self.query_db("SELECT fullname FROM prof WHERE id=?", (prof_id,), one=True) + if add_title: + return f"{self.getTitleById(prof_id)}{prof[0]}" + else: + return prof[0] + def getTitleById(self, prof_id:int): + title = self.query_db("SELECT titel FROM prof WHERE id=?", (prof_id,), one=True)[0] + return f"{title} " if title is not None else "" + def getProfByName(self, prof_name:str): + return self.query_db("SELECT * FROM prof WHERE fullname=?", (prof_name,), one=True) + def getProfId(self, prof_name:str): + data = self.getProfByName(prof_name.replace(",", "")) + if data is None: + return None + else: + return data[0] + def getProfData(self, profname:str): + data = self.query_db("SELECT mail, telnr, titel FROM prof WHERE fullname=?", (profname.replace(",",""),), one=True) + return data + def createProf(self, prof_details:dict): + prof_title = prof_details["prof_title"] + prof_fname = prof_details["profname"].split(",")[1] + prof_fname = prof_fname.strip() + prof_lname = prof_details["profname"].split(",")[0] + prof_lname = prof_lname.strip() + prof_fullname = prof_details["profname"].replace(",", "") + prof_mail = prof_details["prof_mail"] + prof_tel = prof_details["prof_tel"] + params = (prof_title, prof_fname, prof_lname, prof_mail, prof_tel, prof_fullname) + query = "INSERT OR IGNORE INTO prof (titel, fname, lname, mail, telnr, fullname) VALUES (?, ?, ?, ?, ?, ?)" + self.insertInto(query=query, params=params) + def getProfs(self): + return self.query_db("SELECT * FROM prof") - def get_semester(self) -> list[str]: - query = "SELECT DISTINCT erstellsemester FROM semesterapparat" - result = self.cur.execute(query).fetchall() - return [i for i in result] + # Apparat + def getAllAparats(self,deleted=0): + return self.query_db("SELECT * FROM semesterapparat WHERE deletion_status=?", (deleted,)) + def getApparatData(self, appnr, appname)->ApparatData: - def is_eternal(self, id): - query = f"SELECT dauer FROM semesterapparat WHERE id={id}" - result = self.cur.execute(query).fetchone() - return result[0] + result = self.query_db("SELECT * FROM semesterapparat WHERE appnr=? AND name=?", (appnr,appname), one=True) + if result is None: + raise NoResultError("No result found") + apparat = ApparatData() + apparat.appname = result[1] + apparat.appnr = result[4] + apparat.dauerapp = True if result[7] == 1 else False + prof_data = self.getProfData(self.getProfNameById(result[2])) + apparat.profname = self.getProfNameById(result[2]) + apparat.prof_mail = prof_data[0] + apparat.prof_tel = prof_data[1] + apparat.prof_title = prof_data[2] + apparat.app_fach = result[3] + apparat.erstellsemester = result[5] + apparat.semester = result[8] + apparat.deleted = result[9] + apparat.apparat_adis_id = result[11] + apparat.prof_adis_id = result[12] + return apparat + def getUnavailableApparatNumbers(self)->List[int]: + """ + getUnavailableApparatNumbers returns a list of all currently used ApparatNumbers + + + + Returns: + ------- + - number(List[int]): a list of all currently used apparat numbers + """ + numbers = self.query_db("SELECT appnr FROM semesterapparat WHERE deletion_status=0") + return [i[0] for i in numbers] + def setNewSemesterDate(self, appnr, newDate, dauerapp=False): + date = datetime.datetime.strptime(newDate, "%d.%m.%Y").strftime("%Y-%m-%d") + if dauerapp: + self.query_db("UPDATE semesterapparat SET verlängerung_bis=?, dauerapp=? WHERE appnr=?", (date,dauerapp,appnr)) + else: + self.query_db("UPDATE semesterapparat SET endsemester=? WHERE appnr=?", (date,appnr)) + def getApparatId(self, apparat_name): + data = self.query_db("SELECT appnr FROM semesterapparat WHERE name=?", (apparat_name,), one=True) + if data is None: + return None + else: + return data[0] + def createApparat(self, apparat:ApparatData)->Optional[AppPresentError]|int: + prof_id = self.getProfId(apparat.profname) + app_id = self.getApparatId(apparat.appname) + if app_id: + return AppPresentError(app_id) + self.createProf(apparat.get_prof_details()) + prof_id = self.getProfId(apparat.profname) + ic(prof_id) + query = f"INSERT OR IGNORE INTO semesterapparat (appnr, name, erstellsemester, dauer, prof_id, fach,deletion_status,konto) VALUES ('{apparat.appnr}', '{apparat.appname}', '{apparat.semester}', '{apparat.dauerapp}', {prof_id}, '{apparat.app_fach}', '{0}', '{SEMAP_MEDIA_ACCOUNTS[apparat.appnr]}')" + logger.log_info(query) + self.query_db(query) + return self.getApparatId(apparat.appname) + def getApparatsByProf(self, prof_id:int)->list[tuple]: + return self.query_db("SELECT * FROM semesterapparat WHERE prof_id=?", (prof_id,)) + def getApparatsBySemester(self, semester:str)->dict: + data = self.query_db("SELECT name, prof_id FROM semesterapparat WHERE erstellsemester=?", (semester,)) + conn = self.connect() + cursor = conn.cursor() + c_tmp = [] + for i in data: + c_tmp.append((i[0], self.getProfNameById(i[1]))) + query = ( + f"SELECT name,prof_id FROM semesterapparat WHERE deleted_date='{semester}'" + ) + result = cursor.execute(query).fetchall() + d_tmp = [] + for i in result: + d_tmp.append((i[0], self.getProfNameById(i[1]))) + # group the apparats by prof + c_ret = {} + for i in c_tmp: + if i[1] not in c_ret.keys(): + c_ret[i[1]] = [i[0]] + else: + c_ret[i[1]].append(i[0]) + d_ret = {} + for i in d_tmp: + if i[1] not in d_ret.keys(): + d_ret[i[1]] = [i[0]] + else: + d_ret[i[1]].append(i[0]) + self.close_connection(conn) + return {"created": c_ret, "deleted": d_ret} + def getApparatCountBySemester(self)->tuple[list[str],list[int]]: + conn = self.connect() + cursor = conn.cursor() + semesters = self.getSemersters() + created = [] + deleted = [] + for semester in semesters: + query = f"SELECT COUNT(*) FROM semesterapparat WHERE erstellsemester='{semester}'" + result = cursor.execute(query).fetchone() + created.append(result[0]) + query = f"SELECT COUNT(*) FROM semesterapparat WHERE deletion_status=1 AND deleted_date='{semester}'" + result = cursor.execute(query).fetchone() + deleted.append(result[0]) + # store data in a tuple + ret = [] + e_tuple = () + for sem in semesters: + e_tuple = ( + sem, + created[semesters.index(sem)], + deleted[semesters.index(sem)], + ) + ret.append(e_tuple) + self.close_connection(conn) + return ret + def deleteApparat(self, appnr, semester): + self.query_db("UPDATE semesterapparat SET deletion_status=1, deleted_date=? WHERE appnr=?", (semester,appnr)) + def isEternal(self, id): + return self.query_db("SELECT dauer FROM semesterapparat WHERE appnr=?", (id,), one=True) + def getApparatName(self, app_id, prof_id): + return self.query_db("SELECT name FROM semesterapparat WHERE appnr=? AND prof_id=?", (app_id,prof_id), one=True)[0] + def updateApparat(self, apparat_data:ApparatData): + query = f"UPDATE semesterapparat SET name = ?, fach = ?, dauer = ?, prof_id = ? WHERE appnr = ?" + params = ( + apparat_data.appname, + apparat_data.app_fach, + apparat_data.dauerapp, + self.getProfId(apparat_data.profname), + apparat_data.appnr, + ) + self.query_db(query, params) + def checkApparatExists(self, apparat_name): + return True if self.query_db("SELECT appnr FROM semesterapparat WHERE name=?", (apparat_name,), one=True) else False + + # Statistics def statistic_request(self, **kwargs: Any): def __query(query): - result = self.cur.execute(query).fetchall() + conn = self.connect() + cursor = conn.cursor() + result = cursor.execute(query).fetchall() for result_a in result: orig_value = result_a - prof_name = self.get_prof_name_by_id(result_a[2]) + prof_name = self.getProfNameById(result_a[2]) # replace the prof_id with the prof_name result_a = list(result_a) result_a[2] = prof_name result_a = tuple(result_a) result[result.index(orig_value)] = result_a - + self.close_connection(conn) return result - if "deletable" in kwargs.keys(): query = f"SELECT * FROM semesterapparat WHERE deletion_status=0 AND dauer=0 AND (erstellsemester!='{kwargs['deletesemester']}' OR verlängerung_bis!='{kwargs['deletesemester']}')" return __query(query) - if "dauer" in kwargs.keys(): kwargs["dauer"] = kwargs["dauer"].replace("Ja", "1").replace("Nein", "0") query = "SELECT * FROM semesterapparat WHERE " @@ -396,248 +454,39 @@ class Database: "xyz", f"(erstellsemester='{kwargs['endsemester']}' OR verlängerung_bis='{kwargs['endsemester']}') AND ", ) - #remove all x="" parts from the query where x is a key in kwargs - + # remove all x="" parts from the query where x is a key in kwargs query = query[:-5] print(query) return __query(query) - def get_app_count_by_semester(self) -> tuple[list[str], list[int]]: - """get the apparats created and deleted in the distinct semesters""" + # Admin data + def getUser(self): + return self.query_db("SELECT * FROM user", one=True) + def getUsers(self): + return self.query_db("SELECT * FROM user") - # get unique semesters - query = "SELECT DISTINCT erstellsemester FROM semesterapparat" - result = self.cur.execute(query).fetchall() - semesters = [i[0] for i in result] - created = [] - deleted = [] - for semester in semesters: - query = f"SELECT COUNT(*) FROM semesterapparat WHERE erstellsemester='{semester}'" - result = self.cur.execute(query).fetchone() - created.append(result[0]) - query = f"SELECT COUNT(*) FROM semesterapparat WHERE deletion_status=1 AND deleted_date='{semester}'" - result = self.cur.execute(query).fetchone() - deleted.append(result[0]) - # store data in a tuple - ret = [] - e_tuple = () - for sem in semesters: - e_tuple = ( - sem, - created[semesters.index(sem)], - deleted[semesters.index(sem)], - ) - ret.append(e_tuple) - return ret - # get the count of apparats created in the semesters - - def apparats_by_semester(self, semester: str): - """Get a list of all created and deleted apparats in the given semester""" - # get a list of apparats created and in the given semester - query = f"SELECT name,prof_id FROM semesterapparat WHERE erstellsemester='{semester}'" - result = self.cur.execute(query).fetchall() - c_tmp = [] - for i in result: - c_tmp.append((i[0], self.get_prof_name_by_id(i[1]))) - query = ( - f"SELECT name,prof_id FROM semesterapparat WHERE deleted_date='{semester}'" - ) - result = self.cur.execute(query).fetchall() - d_tmp = [] - for i in result: - d_tmp.append((i[0], self.get_prof_name_by_id(i[1]))) - # group the apparats by prof - c_ret = {} - for i in c_tmp: - if i[1] not in c_ret.keys(): - c_ret[i[1]] = [i[0]] - else: - c_ret[i[1]].append(i[0]) - d_ret = {} - for i in d_tmp: - if i[1] not in d_ret.keys(): - d_ret[i[1]] = [i[0]] - else: - d_ret[i[1]].append(i[0]) - return {"created": c_ret, "deleted": d_ret} - - def delete_message(self, message_id): - query = "DELETE FROM messages WHERE id=?" - params = (message_id,) - self.cur.execute(query, params) - self.database.commit() - - def delete_medium(self, title_id): - # delete the bookdata from the media table - query = "UPDATE media SET deleted=1 WHERE id=?" - params = (title_id,) - self.cur.execute(query, params) - self.database.commit() - pass - - def update_bookdata(self, bookdata: BookData, title_id): - query = "UPDATE media SET bookdata=? WHERE id=?" - converted = pickle.dumps(bookdata) - params = (converted, title_id) - self.cur.execute(query, params) - self.database.commit() - - def get_specific_book(self, book_id): - query = "SELECT bookdata FROM media WHERE id=?" - params = (book_id,) - result = self.cur.execute(query, params).fetchone() - return pickle.loads(result[0]) - - def get_media(self, app_id, prof_id, del_state=0) -> list[dict[int, BookData, int]]: - """request media from database and return result as list. - - Args: - ---- - - app_id (int): ID of the apparat - - prof_id (int): ID of the prof - - del_state (int, optional): If deleted books should be requested as well. 1 = yes 0 = no. Defaults to 0. - - Returns: - ------- - - list[dict[int,BookData,int]]: Returns a list of dictionaries containing the bookdata, the id and the availability of the book in the following format: - ------- - {"id": int, - "bookdata": BookData, - "available": int} - """ - query = f"SELECT id,bookdata,available FROM media WHERE (app_id={app_id} AND prof_id={prof_id}) AND (deleted={del_state if del_state == 0 else '1 OR deleted=0'})" - logger.log_info(f"Requesting media from database with query: {query}") - result = self.cur.execute(query).fetchall() - ret_result = [] - for result_a in result: - ic(result_a) - data = {"id": int, "bookdata": BookData, "available": int} - data["id"] = result_a[0] - data["bookdata"] = pickle.loads(result_a[1]) - data["available"] = result_a[2] - ret_result.append(data) - return ret_result - - def get_subjects_and_aliases(self): - query = "SELECT subjects.name, aliases.name FROM subjects LEFT JOIN aliases ON subjects.id = aliases.subject_id" - return self.cur.execute(query).fetchall() - - def get_subjects(self): - query = "SELECT id,name FROM subjects" - return self.cur.execute(query).fetchall() - - def get_aliases(self,subject_id): - query = f"SELECT name FROM aliases WHERE subject_id={subject_id}" - return self.cur.execute(query).fetchall() - - def add_subject(self,subject_name): - query = f"INSERT INTO subjects (name) VALUES ('{subject_name}')" - self.cur.execute(query) - self.database.commit() - - def get_apparats_by_prof(self,prof_id): - query = f"SELECT * FROM semesterapparat WHERE prof_id={prof_id}" - return self.cur.execute(query).fetchall() - def add_alias(self,alias_name,subject): - query = f"SELECT id FROM subjects WHERE name='{subject}'" - subject_id = self.cur.execute(query).fetchone()[0] - query = f"INSERT INTO aliases (name,subject_id) VALUES ('{alias_name}',{subject_id})" - self.cur.execute(query) - self.database.commit() - - def update_apparat(self, apparat_data: ApparatData): - data = apparat_data - query = f"UPDATE semesterapparat SET name = ?, fach = ?, dauer = ?, prof_id = ? WHERE appnr = ?" - params = ( - data.appname, - data.app_fach, - data.dauerapp, - self.get_prof_id(data.profname), - data.appnr, - ) - print(query) - self.cur.execute(query, params) - self.database.commit() - - def delete_apparat(self, appnr: str, semester: str): - # update the deletion status to 1 and the deleted_state to semester for the given apparat - query = f"UPDATE semesterapparat SET deletion_status=1, deleted_date='{semester}' WHERE appnr='{appnr}'" - self.cur.execute(query) - self.database.commit() - - def get_book_id(self, bookdata: BookData, app_id: int, prof_id: int): - query = "SELECT id FROM media WHERE bookdata=? AND app_id=? AND prof_id=?" - params = (pickle.loads(bookdata), app_id, prof_id) - result = self.cur.execute(query, params).fetchone() - return result - - - - def set_availability(self, book_id, available): - query = "UPDATE media SET available=? WHERE id=?" - params = (available, book_id) - self.cur.execute(query, params) - self.database.commit() - - def get_latest_book_id(self): - query = "SELECT id FROM media ORDER BY id DESC LIMIT 1" - result = self.cur.execute(query).fetchone() - return result[0] - - def close(self): - self.database.close() - - def login(self, username, hashed_password) -> bool: - # check if the user and password exist in the database - # if yes, return True - # if no, return False - query = "SELECT salt FROM user WHERE username=?" - params = (username,) - result = self.cur.execute(query, params).fetchone() - - if result is None: + def login(self, user, hashed_password): + salt = self.query_db("SELECT salt FROM user WHERE username=?", (user,), one=True)[0] + if salt is None: return False - salt = result[0] - print(salt) - query = "SELECT password FROM user WHERE username=?" - params = (str(username),) - result = self.cur.execute(query, params).fetchone() - password = result[0] - if password == f"{salt}{hashed_password}": + hashed_password = salt + hashed_password + password = self.query_db("SELECT password FROM user WHERE username=?", (user,), one=True)[0] + if password == hashed_password: return True else: return False - - # admin stuff below here - def get_users(self): - query = "SELECT * FROM user" - return self.cur.execute(query).fetchall() - - def get_apparats(self) -> list[tuple]: - query = "SELECT * FROM semesterapparat" - return self.cur.execute(query).fetchall() - - def change_password(self, user, password): - saltq = "SELECT salt FROM user WHERE username=?" - params = (user,) - salt = self.cur.execute(saltq, params).fetchone()[0] - password = f"{salt}{password}" - query = "UPDATE user SET password=? WHERE username=?" - params = (password, user) - self.cur.execute(query, params) - self.database.commit() - - def get_role(self, username): - query = "SELECT role FROM user WHERE username=?" - params = (username,) - result = self.cur.execute(query,params).fetchone() - return result[0] - - def get_roles(self): - query = "SELECT role FROM user" - return self.cur.execute(query).fetchall() - - def create_user(self, username, password, role, salt): + def changePassword(self, user, new_password): + salt = self.query_db("SELECT salt FROM user WHERE username=?", (user,), one=True)[0] + new_password = salt + new_password + self.query_db("UPDATE user SET password=? WHERE username=?", (new_password,user)) + def getRole(self, user): + return self.query_db("SELECT role FROM user WHERE username=?", (user,), one=True)[0] + def getRoles(self): + return self.query_db("SELECT role FROM user") + def checkUsername(self, user): + data = self.query_db("SELECT username FROM user WHERE username=?", (user,), one=True) + return True if data is not None else False + def createUser(self, user, password, role, salt): """Create a user based on passed data. Args: @@ -647,40 +496,12 @@ class Database: - role (str): Role of the user - salt (str): a random salt for the user """ - query = "INSERT OR IGNORE INTO user (username, password, role, salt) VALUES (?, ?, ?, ?)" - params = (username, password, role, salt) - self.cur.execute(query, params) - self.database.commit() - - def delete_user(self, user): - query = "DELETE FROM user WHERE username=?" - params = (user,) - self.cur.execute(query, params) - self.database.commit() - - def get_faculty_members(self,name:str=None): - query = "SELECT titel, fname,lname,mail,telnr,fullname FROM prof" - if name: - query = query.replace(",fullname", "") - query += f" WHERE fullname='{name}'" - return self.cur.execute(query).fetchall() - - def update_faculty_member(self,data,oldlname,oldfname): - placeholders = ', '.join(f"{k} = :{k}" for k in data.keys()) - sql = f"UPDATE prof SET {placeholders} WHERE lname = :oldlname AND fname = :oldfname" - data["oldlname"] = oldlname - data["oldfname"] = oldfname - print(sql, data) - self.cur.execute(sql, data) - self.database.commit() - - def faculty_data(self,name): - query = f"SELECT * FROM prof WHERE fullname='{name}'" - result = self.cur.execute(query).fetchone() - return result - - def update_user(self,username, data:dict[str,str]): - + self.query_db("INSERT OR IGNORE INTO user (username, password, role, salt) VALUES (?,?,?,?)", (user,password,role,salt)) + def deleteUser(self, user): + self.query_db("DELETE FROM user WHERE username=?", (user,)) + def updateUser(self, username, data:dict[str, str]): + conn = self.connect() + cursor = conn.cursor() query = "UPDATE user SET " for key,value in data.items(): if key == "username": @@ -689,59 +510,16 @@ class Database: query = query[:-1] query += " WHERE username=?" params = (username,) - - self.cur.execute(query,params) - self.database.commit() - - def check_username(self,username): - query = "SELECT username FROM user WHERE username=?" - params = (username,) - result = self.cur.execute(query,params).fetchone() - if result is None: - return False - return True - def get_apparats_name(self, app_id,prof_id): - query = f"SELECT name FROM semesterapparat WHERE appnr={app_id} AND prof_id={prof_id}" - ic(query) - result = self.cur.execute(query).fetchone() - return result[0] - - def search_book(self, data:dict[str,str])->list[tuple[BookData,int]]: - query = "SELECT * FROM media " - result = self.database.execute(query).fetchall() - ret = [] - #get length of data dict - length = len(data) - mode = 0 - if length == 1: - if "signature" in data.keys(): - mode = 1 - elif "title" in data.keys(): - mode = 2 - elif length == 2: - mode = 3 - else: - return None - print(len(result)) - for res in result: - bookdata = pickle.loads(res[1]) - app_id = res[2] - prof_id = res[3] - #if signature and title present in dict: - #if signature present in dict: - if mode == 1: - if data["signature"] in bookdata.signature: - ret.append((bookdata,app_id,prof_id)) - #if title present in dict: - elif mode == 2: - if data["title"] in bookdata.title: - ret.append((bookdata,app_id,prof_id)) - elif mode == 3: - if data["signature"] in bookdata.signature and data["title"] in bookdata.title: - ret.append((bookdata,app_id,prof_id)) - return ret - - -if __name__ == "__main__": - db = Database() - print(db.login("kirchner", "loginpass")) + cursor.execute(query, params) + conn.commit() + self.close_connection(conn) + def getFacultyMember(self, name:str): + return self.query_db("SELECT titel, fname,lname,mail,telnr,fullname FROM prof WHERE fullname=?", (name,), one=True) + def updateFacultyMember(self, data, oldlname, oldfname): + placeholders = ", ".join([f"{i}=:{i} " for i in data.keys()]) + query = f"UPDATE prof SET {placeholders} WHERE lname = :oldlname AND fname = :oldfname" + data["oldlname"] = oldlname + data["oldfname"] = oldfname + self.query_db(query, data) + def getFacultyMembers(self): + return self.query_db("SELECT titel, fname,lname,mail,telnr,fullname FROM prof") diff --git a/src/backend/database_rewrite.py b/src/backend/database_rewrite.py deleted file mode 100644 index 2ecc6df..0000000 --- a/src/backend/database_rewrite.py +++ /dev/null @@ -1,87 +0,0 @@ -import datetime -import os -import re -import sqlite3 as sql -import tempfile -import pickle -from src.logic.log import MyLogger -from icecream import ic -from typing import List, Tuple, Dict, Any, Optional, Union -from omegaconf import OmegaConf -from src.backend.db import CREATE_TABLE_APPARAT, CREATE_TABLE_MESSAGES, CREATE_TABLE_MEDIA, CREATE_TABLE_APPKONTOS, CREATE_TABLE_FILES, CREATE_TABLE_MESSAGES, CREATE_TABLE_PROF, CREATE_TABLE_USER, CREATE_TABLE_SUBJECTS -from src.logic.constants import SEMAP_MEDIA_ACCOUNTS -from src.logic.dataclass import ApparatData, BookData - -config = OmegaConf.load("config.yaml") -logger = MyLogger(__name__) - -def load_pickle(data): - return pickle.loads(data) -def dump_pickle(data): - return pickle.dumps(data) -def create_blob(data): - with open(data, "rb") as f: - return f.read() -def create_file(filename:str, blob): - with tempfile.TemporaryDirectory(delete=False) as tmpdir: - filepath = os.path.join(tmpdir, filename) - with open(filepath, "wb") as f: - f.write(blob) - return filepath - -class Database: - def __init__(self, db_path: str = None): - if db_path is None: - self.db_path = config.database.path - else: - self.db_path = db_path - if self.get_db_contents() is None: - self.create_tables() - - def get_db_contents(self): - try: - with sql.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM sqlite_master WHERE type='table'") - return cursor.fetchall() - except sql.OperationalError: - return None - - def connect(self): - return sql.connect(self.db_path) - - def close_connection(self, conn: sql.Connection): - conn.close() - - def create_tables(self): - conn = self.connect() - cursor = conn.cursor() - cursor.execute(CREATE_TABLE_APPARAT) - cursor.execute(CREATE_TABLE_MESSAGES) - cursor.execute(CREATE_TABLE_MEDIA) - cursor.execute(CREATE_TABLE_APPKONTOS) - cursor.execute(CREATE_TABLE_FILES) - cursor.execute(CREATE_TABLE_PROF) - cursor.execute(CREATE_TABLE_USER) - cursor.execute(CREATE_TABLE_SUBJECTS) - conn.commit() - self.close_connection(conn) - - def query_db(self, query: str, args: Tuple = (), one: bool = False): - conn = self.connect() - cursor = conn.cursor() - cursor.execute(query, args) - rv = cursor.fetchall() - conn.commit() - self.close_connection(conn) - return (rv[0] if rv else None) if one else rv - - - - - - - - - - \ No newline at end of file