From 3794f35942ccf6a035d74aa45bc38cac1333c39e Mon Sep 17 00:00:00 2001 From: WorldTeacher <41587052+WorldTeacher@users.noreply.github.com> Date: Wed, 3 Jul 2024 14:41:09 +0200 Subject: [PATCH] database feature add function to determine the script, function and line that called the database, embed data into log to make fixes easier --- src/backend/database.py | 56 +++++++++++++++++++++++++++++++++++------ 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/src/backend/database.py b/src/backend/database.py index 29b082d..a7c9b33 100644 --- a/src/backend/database.py +++ b/src/backend/database.py @@ -4,7 +4,7 @@ import sqlite3 as sql import tempfile from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union - +import shutil # from icecream import ic from omegaconf import OmegaConf @@ -30,6 +30,8 @@ from src.utils import create_blob, dump_pickle, load_pickle config = OmegaConf.load("config.yaml") ascii_lowercase = "abcdefghijklmnopqrstuvwxyz0123456789)" +caller_frame = inspect.stack()[1] +# get the line that called the function class Database: """ @@ -48,19 +50,58 @@ class Database: script_name = ( caller_frame.filename.replace("\\", "/").split("/")[-1].split(".")[0] ) - print(script_name) name = f"Database.{script_name}" + self.name = script_name self.logger = MyLogger(name) if db_path is None: self.db_path = config.database.path + config.database.name self.db_path = self.db_path.replace("~", str(Path.home())) else: self.db_path = db_path + self.checkDatabaseStatus() + + def checkDatabaseStatus(self): + path = config.database.path + path = path.replace("~", str(Path.home())) + print(path) + path = os.path.abspath(path) + if not os.path.exists(path): + # create path + print(path) + os.makedirs(path) if self.get_db_contents() == []: self.logger.log_critical("Database does not exist, creating tables") self.create_tables() self.insertSubjects() + def get_caller_line(self) -> str: + # get the line from the script that called the function + caller_frame = inspect.stack() + # get the line that called the function based on self.name and caller_frame + script_name = self.name + for frame in caller_frame: + if script_name in frame.filename: + caller_frame = frame + break + line = f"{caller_frame.function}:{caller_frame.lineno} ->" + return line + + def getElsaMediaID(self, work_author, signature, pages): + query = ( + "SELECT id FROM elsa_media WHERE work_author=? AND signature=? AND pages=?" + ) + params = (work_author, signature, pages) + result = self.query_db(query, params, one=True) + if result is None: + return NoResultError( + f"work_author: {work_author}, signature: {signature}, pages: {pages}" + ).__str__() + return result[0] + + def getElsaMediaType(self, id): + query = "SELECT type FROM elsa_media WHERE id=?" + return self.query_db(query, (id,), one=True)[0] + def get_db_contents(self) -> Union[List[Tuple], None]: """ Get the contents of the @@ -147,7 +188,7 @@ class Database: """ conn = self.connect() cursor = conn.cursor() - log_message = f"Querying database with query {query}, args: {args}" + log_message = f"{self.get_caller_line()} Querying database with query {query}, args: {args}" # if "INSERT" in query: # log_message = f"Querying database with query {query}" @@ -856,7 +897,7 @@ class Database: prof_id = self.getProfId(apparat.profname) app_id = self.getApparatId(apparat.appname) if app_id: - raise AppPresentError(app_id) + return AppPresentError(app_id) self.createProf(apparat.get_prof_details()) prof_id = self.getProfId(apparat.profname) @@ -864,8 +905,7 @@ class Database: query = f"INSERT OR IGNORE INTO semesterapparat (appnr, name, erstellsemester, dauer, prof_id, fach,deletion_status,konto) VALUES ('{apparat.appnr}', '{apparat.appname}', '{apparat.semester}', '{apparat.dauerapp}', {prof_id}, '{apparat.app_fach}', '{0}', '{SEMAP_MEDIA_ACCOUNTS[apparat.appnr]}')" self.logger.log_info(query) self.query_db(query) - return self.getApparatId(apparat.appname) - + return None def getApparatsByProf(self, prof_id: Union[str, int]) -> list[tuple]: """Get all apparats based on the professor id @@ -1011,7 +1051,7 @@ class Database: self.logger.log_info(f"Updating apparat with query {query} and params {params}") self.query_db(query, params) - def checkApparatExists(self, apparat_name: str): + def checkApparatExists(self, app_name: str): """check if the apparat is already present in the database based on the name Args: @@ -1024,7 +1064,7 @@ class Database: True if self.query_db( "SELECT appnr FROM semesterapparat WHERE name=?", - (apparat_name,), + (app_name,), one=True, ) else False