rework logging, add more dataclasses, reworked config
This commit is contained in:
@@ -7,7 +7,7 @@ from src import settings
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
# from icecream import ic
|
||||
from omegaconf import OmegaConf
|
||||
|
||||
import datetime
|
||||
from src.backend.db import (
|
||||
CREATE_ELSA_FILES_TABLE,
|
||||
CREATE_ELSA_MEDIA_TABLE,
|
||||
@@ -22,18 +22,18 @@ from src.backend.db import (
|
||||
CREATE_TABLE_USER,
|
||||
)
|
||||
from src.errors import AppPresentError, NoResultError
|
||||
from src.logic import ApparatData, BookData, Prof, MyLogger
|
||||
from src.logic import ApparatData, BookData, Prof, Apparat, ELSA, logger as log
|
||||
from src.logic.constants import SEMAP_MEDIA_ACCOUNTS
|
||||
from src.utils import create_blob, dump_pickle, load_pickle
|
||||
from .semester import generateSemesterByDate
|
||||
from icecream import ic
|
||||
from string import ascii_lowercase as lower, digits
|
||||
|
||||
|
||||
ascii_lowercase = "abcdefghijklmnopqrstuvwxyz0123456789)"
|
||||
caller_frame = inspect.stack()[1]
|
||||
ascii_lowercase = lower + digits
|
||||
# get the line that called the function
|
||||
database = settings.database
|
||||
class Database:
|
||||
database = settings.database
|
||||
"""
|
||||
Initialize the database and create the tables if they do not exist.
|
||||
"""
|
||||
@@ -45,23 +45,15 @@ class Database:
|
||||
Args:
|
||||
db_path (str, optional): Optional Path for testing / specific purposes. Defaults to None.
|
||||
"""
|
||||
caller_frame = inspect.stack()[1]
|
||||
|
||||
script_name = (
|
||||
caller_frame.filename.replace("\\", "/").split("/")[-1].split(".")[0]
|
||||
)
|
||||
name = f"Database.{script_name}"
|
||||
self.name = script_name
|
||||
self.logger = MyLogger(name)
|
||||
if db_path is None:
|
||||
self.db_path = database.path + database.name
|
||||
self.db_path = self.database.path + self.database.name
|
||||
self.db_path = self.db_path.replace("~", str(Path.home()))
|
||||
else:
|
||||
self.db_path = db_path
|
||||
self.checkDatabaseStatus()
|
||||
|
||||
def checkDatabaseStatus(self):
|
||||
path = database.path
|
||||
path = self.database.path
|
||||
path = path.replace("~", str(Path.home()))
|
||||
# print(path)
|
||||
path = os.path.abspath(path)
|
||||
@@ -70,22 +62,10 @@ class Database:
|
||||
# print(path)
|
||||
os.makedirs(path)
|
||||
if self.get_db_contents() == []:
|
||||
self.logger.log_critical("Database does not exist, creating tables")
|
||||
log.critical("Database does not exist, creating tables")
|
||||
self.create_tables()
|
||||
self.insertSubjects()
|
||||
|
||||
def get_caller_line(self) -> str:
|
||||
# get the line from the script that called the function
|
||||
caller_frame = inspect.stack()
|
||||
# get the line that called the function based on self.name and caller_frame
|
||||
script_name = self.name
|
||||
for frame in caller_frame:
|
||||
if script_name in frame.filename:
|
||||
caller_frame = frame
|
||||
break
|
||||
line = f"{caller_frame.function}:{caller_frame.lineno} ->"
|
||||
return line
|
||||
|
||||
def getElsaMediaID(self, work_author, signature, pages):
|
||||
query = (
|
||||
"SELECT id FROM elsa_media WHERE work_author=? AND signature=? AND pages=?"
|
||||
@@ -151,7 +131,6 @@ class Database:
|
||||
cursor.execute(CREATE_TABLE_USER)
|
||||
cursor.execute(CREATE_TABLE_SUBJECTS)
|
||||
cursor.execute(CREATE_ELSA_TABLE)
|
||||
cursor.execute(CREATE_ELSA_PROF_TABLE)
|
||||
cursor.execute(CREATE_ELSA_FILES_TABLE)
|
||||
cursor.execute(CREATE_ELSA_MEDIA_TABLE)
|
||||
conn.commit()
|
||||
@@ -167,11 +146,12 @@ class Database:
|
||||
"""
|
||||
conn = self.connect()
|
||||
cursor = conn.cursor()
|
||||
self.logger.log_info(f"Inserting {params} into database with query {query}")
|
||||
log.info(f"Inserting {params} into database with query {query}")
|
||||
cursor.execute(query, params)
|
||||
conn.commit()
|
||||
self.close_connection(conn)
|
||||
|
||||
@log.catch
|
||||
def query_db(
|
||||
self, query: str, args: Tuple = (), one: bool = False
|
||||
) -> Union[Tuple, List[Tuple]]:
|
||||
@@ -191,24 +171,24 @@ class Database:
|
||||
logs_query = query
|
||||
logs_args = args
|
||||
if "fileblob" in query:
|
||||
#set fileblob arg in logger to "too long"
|
||||
# set fileblob arg in logger to "too long"
|
||||
logs_query = query
|
||||
fileblob_location = query.find("fileblob")
|
||||
#remove fileblob from query
|
||||
# remove fileblob from query
|
||||
logs_query = query[:fileblob_location] + "fileblob = too long"
|
||||
|
||||
log_message = f"{self.get_caller_line()} Querying database with query {logs_query}, args: {logs_args}"
|
||||
|
||||
log_message = f"Querying database with query {logs_query}, args: {logs_args}"
|
||||
# if "INSERT" in query:
|
||||
# log_message = f"Querying database with query {query}"
|
||||
|
||||
self.logger.log_info(log_message)
|
||||
log.info(log_message)
|
||||
try:
|
||||
cursor.execute(query, args)
|
||||
rv = cursor.fetchall()
|
||||
conn.commit()
|
||||
self.close_connection(conn)
|
||||
except sql.OperationalError as e:
|
||||
self.logger.log_error(f"Error in query: {e}")
|
||||
log.error(f"Error in query: {e}")
|
||||
return None
|
||||
return (rv[0] if rv else None) if one else rv
|
||||
|
||||
@@ -231,7 +211,7 @@ class Database:
|
||||
t_query = (
|
||||
f"SELECT bookdata FROM media WHERE app_id={app_id} AND prof_id={prof_id}"
|
||||
)
|
||||
self.logger.log_info(t_query)
|
||||
log.info(t_query)
|
||||
# # print(t_query)
|
||||
result = cursor.execute(t_query).fetchall()
|
||||
result = [load_pickle(i[0]) for i in result]
|
||||
@@ -258,7 +238,7 @@ class Database:
|
||||
params = (converted, app_id, prof_id, 0)
|
||||
cursor.execute(query, params)
|
||||
logMessage = f"Added book with signature {bookdata.signature} to database, data: {converted}"
|
||||
self.logger.log_info(logMessage)
|
||||
log.info(logMessage)
|
||||
conn.commit()
|
||||
self.close_connection(conn)
|
||||
|
||||
@@ -507,7 +487,7 @@ class Database:
|
||||
str: The filename of the recreated file
|
||||
"""
|
||||
blob = self.getBlob(filename, app_id)
|
||||
tempdir = database.tempdir
|
||||
tempdir = self.database.tempdir
|
||||
tempdir = tempdir.replace("~", str(Path.home()))
|
||||
tempdir_path = Path(tempdir)
|
||||
if not os.path.exists(tempdir_path):
|
||||
@@ -660,7 +640,7 @@ class Database:
|
||||
Args:
|
||||
message_id (str): the id of the message
|
||||
"""
|
||||
self.logger.log_info(f"Deleting message with id {message_id}")
|
||||
log.info(f"Deleting message with id {message_id}")
|
||||
self.query_db("DELETE FROM messages WHERE id=?", (message_id,))
|
||||
|
||||
# Prof data
|
||||
@@ -734,7 +714,17 @@ class Database:
|
||||
person = Prof()
|
||||
return person.from_tuple(data)
|
||||
|
||||
|
||||
def getProf(self, id) -> Prof:
|
||||
"""Get a professor based on the id
|
||||
|
||||
Args:
|
||||
id ([type]): the id of the professor
|
||||
|
||||
Returns:
|
||||
Prof: a Prof object containing the data of the professor
|
||||
"""
|
||||
data = self.query_db("SELECT * FROM prof WHERE id=?", (id,), one=True)
|
||||
return Prof().from_tuple(data)
|
||||
|
||||
def getProfs(self) -> list[Prof]:
|
||||
"""Return all the professors in the database
|
||||
@@ -806,7 +796,7 @@ class Database:
|
||||
"SELECT appnr FROM semesterapparat WHERE deletion_status=0"
|
||||
)
|
||||
numbers = [i[0] for i in numbers]
|
||||
self.logger.log_info(f"Currently used apparat numbers: {numbers}")
|
||||
log.info(f"Currently used apparat numbers: {numbers}")
|
||||
return numbers
|
||||
|
||||
def setNewSemesterDate(self, app_id: Union[str, int], newDate, dauerapp=False):
|
||||
@@ -817,16 +807,18 @@ class Database:
|
||||
newDate (str): the new date
|
||||
dauerapp (bool, optional): if the apparat was changed to dauerapparat. Defaults to False.
|
||||
"""
|
||||
# today as yyyy-mm-dd
|
||||
today = datetime.datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
if dauerapp:
|
||||
self.query_db(
|
||||
"UPDATE semesterapparat SET verlängerung_bis=?, dauer=? WHERE appnr=?",
|
||||
(newDate, dauerapp, app_id),
|
||||
"UPDATE semesterapparat SET verlängerung_bis=?, dauer=?, verlängert_am=? WHERE appnr=?",
|
||||
(newDate, dauerapp, today, app_id),
|
||||
)
|
||||
else:
|
||||
self.query_db(
|
||||
"UPDATE semesterapparat SET verlängerung_bis=? WHERE appnr=?",
|
||||
(newDate, app_id),
|
||||
"UPDATE semesterapparat SET verlängerung_bis=?, verlängerung_am=? WHERE appnr=?",
|
||||
(newDate, today, app_id),
|
||||
)
|
||||
|
||||
def getApparatId(self, apparat_name) -> Optional[int]:
|
||||
@@ -872,7 +864,7 @@ class Database:
|
||||
# self.getProfId(apparat.profname)
|
||||
ic(prof_id)
|
||||
query = f"INSERT OR IGNORE INTO semesterapparat (appnr, name, erstellsemester, dauer, prof_id, fach,deletion_status,konto) VALUES ('{apparat.appnr}', '{apparat.appname}', '{apparat.semester}', '{apparat.dauerapp}', {prof_id}, '{apparat.app_fach}', '{0}', '{SEMAP_MEDIA_ACCOUNTS[apparat.appnr]}')"
|
||||
self.logger.log_info(query)
|
||||
log.info(query)
|
||||
self.query_db(query)
|
||||
return None
|
||||
def getApparatsByProf(self, prof_id: Union[str, int]) -> list[tuple]:
|
||||
@@ -884,9 +876,14 @@ class Database:
|
||||
Returns:
|
||||
list[tuple]: a list of tuples containing the apparats
|
||||
"""
|
||||
return self.query_db(
|
||||
data = self.query_db(
|
||||
"SELECT * FROM semesterapparat WHERE prof_id=?", (prof_id,)
|
||||
)
|
||||
ret = []
|
||||
for i in data:
|
||||
print(i)
|
||||
ret.append(Apparat().from_tuple(i))
|
||||
return ret
|
||||
|
||||
def getApparatsBySemester(self, semester: str) -> dict[list]:
|
||||
"""get all apparats based on the semester
|
||||
@@ -960,16 +957,17 @@ class Database:
|
||||
self.close_connection(conn)
|
||||
return ret
|
||||
|
||||
def deleteApparat(self, app_id: Union[str, int], semester: str):
|
||||
def deleteApparat(self, app_id: Union[str, int]):
|
||||
"""Delete an apparat from the database
|
||||
|
||||
Args:
|
||||
app_id (Union[str, int]): the id of the apparat
|
||||
semester (str): the semester the apparat should be deleted from
|
||||
"""
|
||||
today = datetime.datetime.now().strftime("%Y-%m-%d")
|
||||
self.query_db(
|
||||
"UPDATE semesterapparat SET deletion_status=1, deleted_date=? WHERE appnr=?",
|
||||
(semester, app_id),
|
||||
(today, app_id),
|
||||
)
|
||||
|
||||
def isEternal(self, id):
|
||||
@@ -1017,7 +1015,7 @@ class Database:
|
||||
apparat_data.apparat_adis_id,
|
||||
apparat_data.appnr,
|
||||
)
|
||||
self.logger.log_info(f"Updating apparat with query {query} and params {params}")
|
||||
log.info(f"Updating apparat with query {query} and params {params}")
|
||||
self.query_db(query, params)
|
||||
|
||||
def checkApparatExists(self, app_name: str):
|
||||
@@ -1082,7 +1080,7 @@ class Database:
|
||||
result_a = tuple(result_a)
|
||||
result[result.index(orig_value)] = result_a
|
||||
self.close_connection(conn)
|
||||
self.logger.log_info(f"Query result: {result}")
|
||||
log.info(f"Query result: {result}")
|
||||
return result
|
||||
|
||||
if "deletable" in kwargs.keys():
|
||||
@@ -1418,7 +1416,7 @@ class Database:
|
||||
"SELECT fileblob FROM elsa_files WHERE filename=?", (filename,), one=True
|
||||
)[0]
|
||||
# print(blob)
|
||||
tempdir = database.tempdir
|
||||
tempdir = self.database.tempdir
|
||||
tempdir = tempdir.replace("~", str(Path.home()))
|
||||
tempdir_path = Path(tempdir)
|
||||
if not os.path.exists(tempdir_path):
|
||||
@@ -1430,7 +1428,7 @@ class Database:
|
||||
# print("file created")
|
||||
return file.name
|
||||
|
||||
def getElsaApparats(self):
|
||||
def getElsaApparats(self) -> ELSA:
|
||||
"""Get all the ELSA apparats in the database
|
||||
|
||||
Returns:
|
||||
@@ -1438,20 +1436,17 @@ class Database:
|
||||
"""
|
||||
return self.query_db("SELECT * FROM elsa")
|
||||
|
||||
def getElsaId(self, prof, semester, date):
|
||||
def getElsaId(self, prof_id, semester, date):
|
||||
"""get the id of an ELSA apparat based on the professor, semester and date
|
||||
|
||||
Args:
|
||||
prof (str): the name of the professor
|
||||
prof_id (int): the id of the professor
|
||||
semester (str): the semester
|
||||
date (str): the date of the apparat
|
||||
|
||||
Returns:
|
||||
int: the id of the ELSA apparat
|
||||
"""
|
||||
prof_id = self.getElsaProfId(prof)
|
||||
if prof_id is None:
|
||||
return None
|
||||
|
||||
data = self.query_db(
|
||||
"SELECT id FROM elsa WHERE prof_id=? AND semester=? AND date=?",
|
||||
@@ -1491,7 +1486,7 @@ class Database:
|
||||
title = profdata.title #profdata["title"]
|
||||
|
||||
query = f"INSERT INTO prof (fname, lname, fullname, mail, telnr,titel) VALUES ('{fname}','{lname}','{fullname}','{mail}','{telnr}','{title}')"
|
||||
self.logger.log_info(query)
|
||||
log.info(query)
|
||||
cursor.execute(query)
|
||||
|
||||
conn.commit()
|
||||
@@ -1534,7 +1529,7 @@ class Database:
|
||||
else:
|
||||
fullname = profdata["profname"]
|
||||
query = f"SELECT id FROM prof WHERE fullname = '{fullname}'"
|
||||
self.logger.log_info(query)
|
||||
log.info(query)
|
||||
|
||||
cursor.execute(query)
|
||||
result = cursor.fetchone()
|
||||
@@ -1551,7 +1546,7 @@ class Database:
|
||||
conn = self.connect()
|
||||
cursor = conn.cursor()
|
||||
query = f"SELECT * FROM prof WHERE fullname = '{fullname}'"
|
||||
self.logger.log_info(query)
|
||||
log.info(query)
|
||||
|
||||
result = cursor.execute(query).fetchone()
|
||||
if result:
|
||||
|
||||
@@ -74,16 +74,12 @@ CREATE_TABLE_SUBJECTS = """CREATE TABLE subjects (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
name TEXT NOT NULL UNIQUE
|
||||
)"""
|
||||
CREATE_ELSA_PROF_TABLE = """CREATE TABLE elsa_prof (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
fullname TEXT NOT NULL UNIQUE
|
||||
)"""
|
||||
|
||||
CREATE_ELSA_TABLE = """CREATE TABLE elsa (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
date TEXT NOT NULL,
|
||||
semester TEXT NOT NULL,
|
||||
prof_id INTEGER NOT NULL,
|
||||
FOREIGN KEY (prof_id) REFERENCES elsa_prof (id)
|
||||
prof_id INTEGER NOT NULL
|
||||
)"""
|
||||
CREATE_ELSA_FILES_TABLE = """CREATE TABLE elsa_files (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
|
||||
@@ -14,7 +14,7 @@ def generateSemesterByDate(next:bool = False):
|
||||
if month >= 4 and month <= 9:
|
||||
return "SoSe " + str(currentYear)
|
||||
else:
|
||||
if month == any([10, 11, 12]):
|
||||
if month in [10, 11, 12]:
|
||||
return f"WiSe {currentYear}/{currentYear+1}"
|
||||
else:
|
||||
return f"WiSe {currentYear-1}/{currentYear}"
|
||||
@@ -43,4 +43,4 @@ def generateSemesterByOffset(offset):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(generateSemesterByDate(next=True))
|
||||
print(generateSemesterByDate())
|
||||
Reference in New Issue
Block a user