database feature

add function to determine the script, function and line that called the
database, embed data into log to make fixes easier
This commit is contained in:
WorldTeacher
2024-07-03 14:41:09 +02:00
parent f0cb65bf42
commit 3794f35942

View File

@@ -4,7 +4,7 @@ import sqlite3 as sql
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
import shutil
# from icecream import ic # from icecream import ic
from omegaconf import OmegaConf from omegaconf import OmegaConf
@@ -30,6 +30,8 @@ from src.utils import create_blob, dump_pickle, load_pickle
config = OmegaConf.load("config.yaml") config = OmegaConf.load("config.yaml")
ascii_lowercase = "abcdefghijklmnopqrstuvwxyz0123456789)" ascii_lowercase = "abcdefghijklmnopqrstuvwxyz0123456789)"
caller_frame = inspect.stack()[1]
# get the line that called the function
class Database: class Database:
""" """
@@ -48,19 +50,58 @@ class Database:
script_name = ( script_name = (
caller_frame.filename.replace("\\", "/").split("/")[-1].split(".")[0] caller_frame.filename.replace("\\", "/").split("/")[-1].split(".")[0]
) )
print(script_name)
name = f"Database.{script_name}" name = f"Database.{script_name}"
self.name = script_name
self.logger = MyLogger(name) self.logger = MyLogger(name)
if db_path is None: if db_path is None:
self.db_path = config.database.path + config.database.name self.db_path = config.database.path + config.database.name
self.db_path = self.db_path.replace("~", str(Path.home())) self.db_path = self.db_path.replace("~", str(Path.home()))
else: else:
self.db_path = db_path self.db_path = db_path
self.checkDatabaseStatus()
def checkDatabaseStatus(self):
path = config.database.path
path = path.replace("~", str(Path.home()))
print(path)
path = os.path.abspath(path)
if not os.path.exists(path):
# create path
print(path)
os.makedirs(path)
if self.get_db_contents() == []: if self.get_db_contents() == []:
self.logger.log_critical("Database does not exist, creating tables") self.logger.log_critical("Database does not exist, creating tables")
self.create_tables() self.create_tables()
self.insertSubjects() self.insertSubjects()
def get_caller_line(self) -> str:
# get the line from the script that called the function
caller_frame = inspect.stack()
# get the line that called the function based on self.name and caller_frame
script_name = self.name
for frame in caller_frame:
if script_name in frame.filename:
caller_frame = frame
break
line = f"{caller_frame.function}:{caller_frame.lineno} ->"
return line
def getElsaMediaID(self, work_author, signature, pages):
query = (
"SELECT id FROM elsa_media WHERE work_author=? AND signature=? AND pages=?"
)
params = (work_author, signature, pages)
result = self.query_db(query, params, one=True)
if result is None:
return NoResultError(
f"work_author: {work_author}, signature: {signature}, pages: {pages}"
).__str__()
return result[0]
def getElsaMediaType(self, id):
query = "SELECT type FROM elsa_media WHERE id=?"
return self.query_db(query, (id,), one=True)[0]
def get_db_contents(self) -> Union[List[Tuple], None]: def get_db_contents(self) -> Union[List[Tuple], None]:
""" """
Get the contents of the Get the contents of the
@@ -147,7 +188,7 @@ class Database:
""" """
conn = self.connect() conn = self.connect()
cursor = conn.cursor() cursor = conn.cursor()
log_message = f"Querying database with query {query}, args: {args}" log_message = f"{self.get_caller_line()} Querying database with query {query}, args: {args}"
# if "INSERT" in query: # if "INSERT" in query:
# log_message = f"Querying database with query {query}" # log_message = f"Querying database with query {query}"
@@ -856,7 +897,7 @@ class Database:
prof_id = self.getProfId(apparat.profname) prof_id = self.getProfId(apparat.profname)
app_id = self.getApparatId(apparat.appname) app_id = self.getApparatId(apparat.appname)
if app_id: if app_id:
raise AppPresentError(app_id) return AppPresentError(app_id)
self.createProf(apparat.get_prof_details()) self.createProf(apparat.get_prof_details())
prof_id = self.getProfId(apparat.profname) prof_id = self.getProfId(apparat.profname)
@@ -864,8 +905,7 @@ class Database:
query = f"INSERT OR IGNORE INTO semesterapparat (appnr, name, erstellsemester, dauer, prof_id, fach,deletion_status,konto) VALUES ('{apparat.appnr}', '{apparat.appname}', '{apparat.semester}', '{apparat.dauerapp}', {prof_id}, '{apparat.app_fach}', '{0}', '{SEMAP_MEDIA_ACCOUNTS[apparat.appnr]}')" query = f"INSERT OR IGNORE INTO semesterapparat (appnr, name, erstellsemester, dauer, prof_id, fach,deletion_status,konto) VALUES ('{apparat.appnr}', '{apparat.appname}', '{apparat.semester}', '{apparat.dauerapp}', {prof_id}, '{apparat.app_fach}', '{0}', '{SEMAP_MEDIA_ACCOUNTS[apparat.appnr]}')"
self.logger.log_info(query) self.logger.log_info(query)
self.query_db(query) self.query_db(query)
return self.getApparatId(apparat.appname) return None
def getApparatsByProf(self, prof_id: Union[str, int]) -> list[tuple]: def getApparatsByProf(self, prof_id: Union[str, int]) -> list[tuple]:
"""Get all apparats based on the professor id """Get all apparats based on the professor id
@@ -1011,7 +1051,7 @@ class Database:
self.logger.log_info(f"Updating apparat with query {query} and params {params}") self.logger.log_info(f"Updating apparat with query {query} and params {params}")
self.query_db(query, params) self.query_db(query, params)
def checkApparatExists(self, apparat_name: str): def checkApparatExists(self, app_name: str):
"""check if the apparat is already present in the database based on the name """check if the apparat is already present in the database based on the name
Args: Args:
@@ -1024,7 +1064,7 @@ class Database:
True True
if self.query_db( if self.query_db(
"SELECT appnr FROM semesterapparat WHERE name=?", "SELECT appnr FROM semesterapparat WHERE name=?",
(apparat_name,), (app_name,),
one=True, one=True,
) )
else False else False