Files
SemesterapparatsManager/src/backend/database.py

2009 lines
71 KiB
Python

import datetime
import json
import os
import re
import sqlite3 as sql
import tempfile
from dataclasses import asdict
from pathlib import Path
from string import ascii_lowercase as lower
from string import digits, punctuation
from typing import Any, List, Optional, Tuple, Union
from src import DATABASE_DIR, settings
from src.backend.db import (
CREATE_ELSA_FILES_TABLE,
CREATE_ELSA_MEDIA_TABLE,
CREATE_ELSA_TABLE,
CREATE_TABLE_APPARAT,
CREATE_TABLE_FILES,
CREATE_TABLE_MEDIA,
CREATE_TABLE_MESSAGES,
CREATE_TABLE_NEWEDITIONS,
CREATE_TABLE_PROF,
CREATE_TABLE_SUBJECTS,
CREATE_TABLE_USER,
)
from src.errors import AppPresentError, NoResultError
from src.logic import ELSA, Apparat, ApparatData, BookData, Prof
from src.logic.constants import SEMAP_MEDIA_ACCOUNTS
from src.logic.semester import Semester
from src.shared.logging import log
from src.utils.blob import create_blob
ascii_lowercase = lower + digits + punctuation
# get the line that called the function
class Database:
"""
Initialize the database and create the tables if they do not exist.
"""
def __init__(self, db_path: Union[Path, None] = None):
"""
Default constructor for the database class
Args:
db_path (str, optional): Optional Path for testing / specific purposes. Defaults to None.
"""
if db_path is None:
if settings.database.path is not None:
self.db_path = Path(
settings.database.path.expanduser(), settings.database.name
)
else:
self.db_path = None
# self.db_path = self.db_path.replace("~", str(Path.home()))
else:
self.db_path = db_path
log.debug(f"Database path: {self.db_path}")
self.db_initialized = False
self.startup_check()
def startup_check(self):
# check existence of all tables. if any is missing, recreate the table
if not self.db_initialized:
self.initializeDatabase()
tables = self.get_db_contents()
tables = [t[1] for t in tables] if tables is not None else []
required_tables = [
"semesterapparat",
"messages",
"media",
"files",
"prof",
"user",
"subjects",
"elsa",
"elsa_files",
"elsa_media",
"neweditions",
]
for table in required_tables:
if table not in tables:
log.critical(f"Table {table} is missing, recreating...")
self.create_table(table)
def create_table(self, table_name: str):
match table_name:
case "semesterapparat":
query = CREATE_TABLE_APPARAT
case "messages":
query = CREATE_TABLE_MESSAGES
case "media":
query = CREATE_TABLE_MEDIA
case "files":
query = CREATE_TABLE_FILES
case "prof":
query = CREATE_TABLE_PROF
case "user":
query = CREATE_TABLE_USER
case "subjects":
query = CREATE_TABLE_SUBJECTS
case "elsa":
query = CREATE_ELSA_TABLE
case "elsa_files":
query = CREATE_ELSA_FILES_TABLE
case "elsa_media":
query = CREATE_ELSA_MEDIA_TABLE
case "neweditions":
query = CREATE_TABLE_NEWEDITIONS
case _:
log.error(f"Table {table_name} is not a valid table name")
self.query_db(query)
def initializeDatabase(self):
if not self.db_initialized:
self.checkDatabaseStatus()
self.db_initialized = True
# run migrations after initial creation to bring schema up-to-date
try:
if self.db_path is not None:
self.run_migrations()
except Exception as e:
log.error(f"Error while running migrations: {e}")
# --- Migration helpers integrated into Database ---
def _ensure_migrations_table(self, conn: sql.Connection) -> None:
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
id TEXT PRIMARY KEY,
applied_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
def _applied_migrations(self, conn: sql.Connection) -> List[str]:
cursor = conn.cursor()
cursor.execute("SELECT id FROM schema_migrations ORDER BY id")
rows = cursor.fetchall()
return [r[0] for r in rows]
def _apply_sql_file(self, conn: sql.Connection, path: Path) -> None:
log.info(f"Applying migration {path.name}")
sql_text = path.read_text(encoding="utf-8")
cursor = conn.cursor()
cursor.executescript(sql_text)
cursor.execute(
"INSERT OR REPLACE INTO schema_migrations (id) VALUES (?)", (path.name,)
)
conn.commit()
def run_migrations(self) -> None:
"""Apply unapplied .sql migrations from src/backend/migrations using this Database's connection."""
migrations_dir = Path(__file__).parent / "migrations"
if not migrations_dir.exists():
log.debug("Migrations directory does not exist, skipping migrations")
return
conn = self.connect()
try:
self._ensure_migrations_table(conn)
applied = set(self._applied_migrations(conn))
migration_files = sorted(
[p for p in migrations_dir.iterdir() if p.suffix == ".sql"]
)
for m in migration_files:
if m.name in applied:
log.debug(f"Skipping already applied migration {m.name}")
continue
self._apply_sql_file(conn, m)
finally:
conn.close()
# --- end migration helpers ---
def overwritePath(self, new_db_path: str):
log.debug("got new path, overwriting")
self.db_path = Path(new_db_path)
def checkDatabaseStatus(self):
path = settings.database.path
if path is None:
path = Path(DATABASE_DIR)
# path = path.replace("~", str(Path.home()))
# path = os.path.abspath(path)
if not os.path.exists(path):
# create path
# log.debug(path)
os.makedirs(path)
if self.get_db_contents() == []:
log.critical("Database does not exist, creating tables")
log.critical(f"Path: {path}")
self.create_tables()
self.insertSubjects()
def getElsaMediaID(self, work_author: str, signature: str, pages: str):
query = (
"SELECT id FROM elsa_media WHERE work_author=? AND signature=? AND pages=?"
)
params = (work_author, signature, pages)
result = self.query_db(query, params, one=True)
if result is None:
return NoResultError(
f"work_author: {work_author}, signature: {signature}, pages: {pages}"
).__str__()
return result[0]
def getElsaMediaType(self, id):
query = "SELECT type FROM elsa_media WHERE id=?"
return self.query_db(query, (id,), one=True)[0]
def get_db_contents(self) -> Union[List[Tuple[Any]], None]:
"""
Get the contents of the
Returns:
Union[List[Tuple], None]: _description_
"""
try:
with sql.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM sqlite_master WHERE type='table'")
return cursor.fetchall()
except sql.OperationalError:
return None
def connect(self) -> sql.Connection:
"""
Connect to the database
Returns:
sql.Connection: The active connection to the database
"""
conn = sql.connect(self.db_path)
# Fast pragmas suitable for a desktop app DB
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;")
conn.execute("PRAGMA temp_store=MEMORY;")
conn.execute("PRAGMA mmap_size=134217728;") # 128MB
return conn
def close_connection(self, conn: sql.Connection):
"""
closes the connection to the database
Args:
----
- conn (sql.Connection): the connection to be closed
"""
conn.close()
def create_tables(self):
"""
Create the tables in the database
"""
# Bootstrapping of tables is handled via migrations. Run migrations instead
# of executing the hard-coded DDL here. Migrations are idempotent and
# contain the CREATE TABLE IF NOT EXISTS statements.
self.run_migrations()
def insertInto(self, query: str, params: Tuple) -> None:
"""
Insert sent data into the database
Args:
query (str): The query to be executed
params (Tuple): the parameters to be inserted into the database
"""
conn = self.connect()
cursor = conn.cursor()
log.debug(f"Inserting into DB: {query}")
cursor.execute(query, params)
conn.commit()
self.close_connection(conn)
def getWebADISAuth(self) -> Tuple[str, str]:
"""
Get the WebADIS authentication data from the database
Returns:
Tuple[str, str]: The username and password for WebADIS
"""
result = self.query_db(
"SELECT username, password FROM webadis_login WHERE effective_range='SAP'",
one=True,
)
if result is None:
return ("", "")
return (result[0], result[1])
@log.catch
def query_db(
self,
query: str,
args: Tuple[Any] = (), # type:ignore
one: bool = False, # type:ignore
) -> Union[Tuple[Any, Any], List[Tuple[Any, Any]]]:
"""
Query the Database for the sent query.
Args:
query (str): The query to be executed
args (Tuple, optional): The arguments for the query. Defaults to ().
one (bool, optional): Return the first result only. Defaults to False.
Returns:
Union[Tuple | List[Tuple]]: Returns the result of the query
"""
conn = self.connect()
cursor = conn.cursor()
logs_query = query
logs_args = args
# if "fileblob" in query:
# # set fileblob arg in logger to "too long"
# logs_query = query
# fileblob_location = query.find("fileblob")
# # remove fileblob from query
# logs_query = query[:fileblob_location] + "fileblob = too long"
log_message = f"Querying database with query {logs_query}, args: {logs_args}"
# if "INSERT" in query:
# log_message = f"Querying database with query {query}"
if "INTO user" in query:
log_message = f"Querying database with query {query}"
# log.debug(f"DB Query: {log_message}")
log.debug(log_message)
try:
cursor.execute(query, args)
rv = cursor.fetchall()
conn.commit()
self.close_connection(conn)
except sql.OperationalError as e:
log.error(f"Error in query: {e}")
return None
return (rv[0] if rv else None) if one else rv
# Books
def addBookToDatabase(
self, bookdata: BookData, app_id: Union[str, int], prof_id: Union[str, int]
):
"""
Add books to the database. Both app_id and prof_id are required to add the book to the database, as the app_id and prof_id are used to select the books later on.
Args:
bookdata (BookData): The metadata of the book to be added
app_id (str): The apparat id where the book should be added to
prof_id (str): The id of the professor where the book should be added to.
"""
log.info(f"Adding book {bookdata.signature} to database")
if app_id is None or prof_id is None:
raise ValueError("Apparate ID or Prof ID is None")
conn = self.connect()
cursor = conn.cursor()
t_query = (
f"SELECT bookdata FROM media WHERE app_id={app_id} AND prof_id={prof_id}"
)
log.debug(t_query)
# # log.debug(t_query)
result = cursor.execute(t_query).fetchall()
result = [BookData().from_string(i[0]) for i in result]
if bookdata in result:
# log.debug("Bookdata already in database")
# check if the book was deleted in the apparat
query = (
"SELECT deleted FROM media WHERE app_id=? AND prof_id=? AND bookdata=?"
)
params = (app_id, prof_id, json.dumps(asdict(bookdata), ensure_ascii=False))
result = cursor.execute(query, params).fetchone()
if result[0] == 1:
# log.debug("Book was deleted, updating bookdata")
query = "UPDATE media SET deleted=0 WHERE app_id=? AND prof_id=? AND bookdata=?"
params = (
app_id,
prof_id,
json.dumps(asdict(bookdata), ensure_ascii=False),
)
cursor.execute(query, params)
conn.commit()
return
query = (
"INSERT INTO media (bookdata, app_id, prof_id,deleted) VALUES (?, ?, ?,?)"
)
converted = json.dumps(asdict(bookdata), ensure_ascii=False)
params = (converted, app_id, prof_id, 0)
cursor.execute(query, params)
logMessage = f"Added book with signature {bookdata.signature} to database, data: {converted}"
log.info(logMessage)
conn.commit()
self.close_connection(conn)
def getBookIdBasedOnSignature(
self, app_id: Union[str, int], prof_id: Union[str, int], signature: str
) -> int:
"""
Get a book id based on the signature of the book.
Args:
app_id (str): The apparat id the book should be associated with
prof_id (str): The professor id the book should be associated with
signature (str): The signature of the book
Returns:
int: The id of the book
"""
result = self.query_db(
"SELECT bookdata, id FROM media WHERE app_id=? AND prof_id=?",
(app_id, prof_id),
)
books = [(BookData().from_string(i[0]), i[1]) for i in result]
book = [i for i in books if i[0].signature == signature][0][1]
return book
def getBookBasedOnSignature(
self, app_id: Union[str, int], prof_id: Union[str, int], signature: str
) -> BookData:
"""
Get the book based on the signature of the book.
Args:
app_id (str): The apparat id the book should be associated with
prof_id (str): The professor id the book should be associated with
signature (str): The signature of the book
Returns:
BookData: The total metadata of the book wrapped in a BookData object
"""
result = self.query_db(
"SELECT bookdata FROM media WHERE app_id=? AND prof_id=?", (app_id, prof_id)
)
books: list[BookData] = [BookData().from_string(i[0]) for i in result]
book = [i for i in books if i.signature == signature][0]
return book
def getLastBookId(self) -> int:
"""
Get the last book id in the database
Returns:
int: ID of the last book in the database
"""
return self.query_db("SELECT id FROM media ORDER BY id DESC", one=True)[0]
def searchBook(
self, data: dict[str, str]
) -> Optional[list[tuple["BookData", int, int]]]:
"""
Search a book in the database using regex against signature/title.
Args:
data: may contain:
- "signature": regex to match against BookData.signature
- "title": regex to match against BookData.title
Returns:
list of (BookData, app_id, prof_id) tuples, or None if invalid args
"""
# Determine mode (kept compatible with your original logic)
mode = 0
if len(data) == 1 and "signature" in data:
mode = 1
elif len(data) == 1 and "title" in data:
mode = 2
elif len(data) == 2 and "signature" in data and "title" in data:
mode = 3
else:
return None
def _compile(expr: str) -> re.Pattern:
try:
return re.compile(expr, re.IGNORECASE | re.UNICODE)
except re.error:
# If user provided a broken regex, treat it as a literal
return re.compile(re.escape(expr), re.IGNORECASE | re.UNICODE)
sig_re = _compile(data["signature"]) if mode in (1, 3) else None
title_re = _compile(data["title"]) if mode in (2, 3) else None
# Fetch candidates once
rows = self.query_db("SELECT * FROM media WHERE deleted=0")
results: list[tuple["BookData", int, int]] = []
for row in rows:
bookdata = BookData().from_string(
row[1]
) # assumes row[1] is the serialized bookdata
app_id = row[2]
prof_id = row[3]
sig_val = bookdata.signature
title_val = bookdata.title
if mode == 1:
if sig_re.search(sig_val):
results.append((bookdata, app_id, prof_id))
elif mode == 2:
if title_re.search(title_val):
results.append((bookdata, app_id, prof_id))
else: # mode == 3
if sig_re.search(sig_val) and title_re.search(title_val):
results.append((bookdata, app_id, prof_id))
return results
def setAvailability(self, book_id: str, available: str):
"""
Set the availability of a book in the database
Args:
book_id (str): The id of the book
available (str): The availability of the book
"""
self.query_db("UPDATE media SET available=? WHERE id=?", (available, book_id))
def getBookId(
self, bookdata: BookData, app_id: Union[str, int], prof_id: Union[str, int]
) -> int:
"""
Get the id of a book based on the metadata of the book
Args:
bookdata (BookData): The wrapped metadata of the book
app_id (str): The apparat id the book should be associated with
prof_id (str): The professor id the book should be associated with
Returns:
int: ID of the book
"""
result = self.query_db(
"SELECT id FROM media WHERE bookdata=? AND app_id=? AND prof_id=?",
(bookdata.to_dict, app_id, prof_id),
one=True,
)
return result[0]
def getBook(self, book_id: int) -> BookData:
"""
Get the book based on the id in the database
Args:
book_id (int): The id of the book
Returns:
BookData: The metadata of the book wrapped in a BookData object
"""
return BookData().from_string(
self.query_db(
"SELECT bookdata FROM media WHERE id=?", (book_id,), one=True
)[0]
)
def getBooks(
self, app_id: Union[str, int], prof_id: Union[str, int], deleted: int = 0
) -> list[dict[str, Union[BookData, int]]]:
"""
Get the Books based on the apparat id and the professor id
Args:
app_id (str): The ID of the apparat
prof_id (str): The ID of the professor
deleted (int, optional): The state of the book. Set to 1 to include deleted ones. Defaults to 0.
Returns:
list[dict[int, BookData, int]]: A list of dictionaries containing the id, the metadata of the book and the availability of the book
"""
qdata = self.query_db(
f"SELECT id,bookdata,available FROM media WHERE (app_id={app_id} AND prof_id={prof_id}) AND (deleted={deleted if deleted == 0 else '1 OR deleted=0'})"
)
ret_result = []
if qdata is None:
return []
for result_a in qdata:
data: dict[str, Any] = {"id": int, "bookdata": BookData, "available": int}
data["id"] = result_a[0]
data["bookdata"] = BookData().from_string(result_a[1])
data["available"] = result_a[2]
ret_result.append(data)
return ret_result
def getAllBooks(self) -> list[dict[str, Union[int, BookData]]]:
"""
Get all books in the database that are not set as deleted
Returns
-------
list[dict[str, Union[int, BookData]]]
A list of dictionaries containing the id and the metadata of the book
"""
# return all books in the database
qdata = self.query_db("SELECT id,bookdata FROM media WHERE deleted=0")
ret_result: list[dict[str, Any]] = []
if qdata is None:
return []
for result_a in qdata:
data: dict[str, Any] = {"id": int, "bookdata": BookData}
data["id"] = result_a[0]
data["bookdata"] = BookData().from_string(result_a[1])
ret_result.append(data)
return ret_result
def getApparatNrByBookId(self, book_id):
appNr = self.query_db(
"SELECT appnr FROM semesterapparat WHERE id IN (SELECT app_id FROM media WHERE id=?)",
(book_id,),
one=True,
)
return appNr[0] if appNr else None
def getBooksByProfId(
self, prof_id: int, deleted: int = 0
) -> list[dict[str, Union[int, BookData]]]:
"""
Get the Books based on the professor id
Parameters
----------
prof_id : int
The ID of the professor
deleted : int, optional
If set to 1, it will include deleted books, by default 0
Returns
-------
list[dict[str, Union[int, BookData]]]
A list of dictionaries containing the id, the metadata of the book and the availability of the book
"""
qdata = self.query_db(
f"SELECT id,bookdata,available FROM media WHERE prof_id={prof_id} AND (deleted={deleted if deleted == 0 else '1 OR deleted=0'})"
)
ret_result = []
if qdata is None:
return []
for result_a in qdata:
data: dict[str, Any] = {"id": int, "bookdata": BookData, "available": int}
data["id"] = result_a[0]
data["bookdata"] = BookData().from_string(result_a[1])
data["available"] = result_a[2]
ret_result.append(data)
return ret_result
def updateBookdata(self, book_id: int, bookdata: BookData):
"""
Update the bookdata in the database
Args:
book_id (str): The id of the book
bookdata (BookData): The new metadata of the book
"""
query = "UPDATE media SET bookdata= ? WHERE id=?"
book = bookdata.to_dict
self.query_db(query, (book, book_id))
def deleteBook(self, book_id: int):
"""
Delete a book from the database
Args:
book_id (str): ID of the book
"""
self.query_db("UPDATE media SET deleted=1 WHERE id=?", (book_id,))
def deleteBooks(self, ids: list[int]):
"""
Delete multiple books from the database
Args:
ids (list[int]): A list of book ids to be deleted
"""
query = f"UPDATE media SET deleted=1 WHERE id IN ({','.join(['?'] * len(ids))})"
self.query_db(query, tuple(ids))
# File Interactions
def getBlob(self, filename: str, app_id: Union[str, int]) -> bytes:
"""
Get a blob from the database
Args:
filename (str): The name of the file
app_id (str): ID of the apparat
Returns:
bytes: The file stored in
"""
return self.query_db(
"SELECT fileblob FROM files WHERE filename=? AND app_id=?",
(filename, app_id),
one=True,
)[0]
def insertFile(
self, file: list[dict], app_id: Union[str, int], prof_id: Union[str, int]
):
"""Instert a list of files into the database
Args:
file (list[dict]): a list containing all the files to be inserted
Structured: [{"name": "filename", "path": "path", "type": "filetype"}]
app_id (int): the id of the apparat
prof_id (str): the id of the professor
"""
for f in file:
filename = f["name"]
path = f["path"]
filetyp = f["type"]
if path == "Database":
continue
blob = create_blob(path)
query = "INSERT OR IGNORE INTO files (filename, fileblob, app_id, filetyp,prof_id) VALUES (?, ?, ?, ?,?)"
self.query_db(query, (filename, blob, app_id, filetyp, prof_id))
def recreateFile(
self, filename: str, app_id: Union[str, int], filetype: str
) -> str:
"""Recreate a file from the database
Args:
filename (str): the name of the file
app_id (Union[str,int]): the id of the apparat
filetype (str): the extension of the file to be created
Returns:
str: The filename of the recreated file
"""
blob = self.getBlob(filename, app_id)
log.debug(blob)
tempdir = settings.database.temp.expanduser()
if not tempdir.exists():
tempdir.mkdir(parents=True, exist_ok=True)
file = tempfile.NamedTemporaryFile(
delete=False, dir=tempdir, mode="wb", suffix=f".{filetype}"
)
file.write(blob)
# log.debug("file created")
return file.name
def getFiles(self, app_id: Union[str, int], prof_id: int) -> list[tuple]:
"""Get all the files associated with the apparat and the professor
Args:
app_id (Union[str,int]): The id of the apparat
prof_id (Union[str,int]): the id of the professor
Returns:
list[tuple]: a list of tuples containing the filename and the filetype for the corresponding apparat and professor
"""
return self.query_db(
"SELECT filename, filetyp FROM files WHERE app_id=? AND prof_id=?",
(app_id, prof_id),
)
def getSemesters(self) -> list[str]:
"""Return all the unique semesters in the database
Returns:
list: a list of strings containing the semesters
"""
data = self.query_db("SELECT DISTINCT erstellsemester FROM semesterapparat")
return [i[0] for i in data]
def insertSubjects(self):
# log.debug("Inserting subjects")
subjects = [
"Biologie",
"Chemie",
"Deutsch",
"Englisch",
"Erziehungswissenschaft",
"Französisch",
"Geographie",
"Geschichte",
"Gesundheitspädagogik",
"Haushalt / Textil",
"Kunst",
"Mathematik / Informatik",
"Medien in der Bildung",
"Musik",
"Philosophie",
"Physik",
"Politikwissenschaft",
"Prorektorat Lehre und Studium",
"Psychologie",
"Soziologie",
"Sport",
"Technik",
"Theologie",
"Wirtschaftslehre",
]
conn = self.connect()
cursor = conn.cursor()
for subject in subjects:
cursor.execute("INSERT INTO subjects (name) VALUES (?)", (subject,))
conn.commit()
self.close_connection(conn)
def getSubjects(self):
"""Get all the subjects in the database
Returns:
list[tuple]: a list of tuples containing the subjects
"""
return self.query_db("SELECT * FROM subjects")
# Messages
def addMessage(
self, messages: list[dict[str, Any]], user: str, app_id: Union[str, int]
):
"""add a Message to the database
Args:
messages (list[dict[str, Any]]): the messages to be added
user (str): the user who added the messages
app_id (Union[str,int]): the id of the apparat
"""
def __getUserId(user: str):
return self.query_db(
"SELECT id FROM user WHERE username=?", (user,), one=True
)[0]
user_id = __getUserId(user)
for message in messages:
self.query_db(
"INSERT INTO messages (message, user_id, remind_at,appnr) VALUES (?,?,?,?)",
(message["message"], user_id, message["remind_at"], app_id),
)
def getAllMessages(self) -> list[dict[str, str, str, str]]:
"""Get all the messages in the database
Returns:
list[dict[str, str, str, str]]: a list of dictionaries containing the message, the user who added the message, the apparat id and the id of the message
"""
def __get_user_name(user_id: int):
return self.query_db(
"SELECT username FROM user WHERE id=?", (user_id,), one=True
)[0]
messages = self.query_db("SELECT * FROM messages")
ret = [
{
"message": i[2],
"user": __get_user_name(i[4]),
"appnr": i[5],
"id": i[0],
"remind_at": i[3],
}
for i in messages
]
return ret
def getMessages(self, date: str) -> list[dict[str, str]]:
"""Get all the messages for a specific date
Args:
date (str): a date.datetime object formatted as a string in the format "YYYY-MM-DD"
Returns:
list[dict[str, str]]: a list of dictionaries containing the message, the user who added the message, the apparat id and the id of the message
"""
def __get_user_name(user_id: int):
return self.query_db(
"SELECT username FROM user WHERE id=?", (user_id,), one=True
)[0]
messages = self.query_db("SELECT * FROM messages WHERE remind_at=?", (date,))
ret = [
{"message": i[2], "user": __get_user_name(i[4]), "appnr": i[5], "id": i[0]}
for i in messages
]
return ret
def deleteMessage(self, message_id: int):
"""Delete a message from the database
Args:
message_id (str): the id of the message
"""
log.debug(f"Deleting message with id {message_id}")
self.query_db("DELETE FROM messages WHERE id=?", (message_id,))
# Prof data
def getProfNameById(self, prof_id: Union[str, int], add_title: bool = False) -> str:
"""Get a professor name based on the id
Args:
prof_id (Union[str,int]): The id of the professor
add_title (bool, optional): wether to add the title or no. Defaults to False.
Returns:
str: The name of the professor
"""
prof = self.query_db(
"SELECT fullname FROM prof WHERE id=?", (prof_id,), one=True
)
if add_title:
return f"{self.getTitleById(prof_id)}{prof[0]}"
else:
return prof[0]
def getProfMailById(self, prof_id: Union[str, int]) -> str:
"""get the mail of a professor based on the id
Args:
prof_id (Union[str,int]): the id of the professor
Returns:
str: the mail of the professor
"""
mail = self.query_db("SELECT mail FROM prof WHERE id=?", (prof_id,), one=True)[
0
]
return mail if mail is not None else ""
def getTitleById(self, prof_id: Union[str, int]) -> str:
"""get the title of a professor based on the id
Args:
prof_id (Union[str,int]): the id of the professor
Returns:
str: the title of the professor, with an added whitespace at the end, if no title is present, an empty string is returned
"""
title = self.query_db(
"SELECT titel FROM prof WHERE id=?", (prof_id,), one=True
)[0]
return f"{title} " if title is not None else ""
def getSpecificProfData(
self, prof_id: Union[str, int], fields: List[str]
) -> tuple[Any, ...]:
"""A customisable function to get specific data of a professor based on the id
Args:
prof_id (Union[str,int]): the id of the professor
fields (List[str]): a list of fields to be returned
Returns:
tuple: a tuple containing the requested data
"""
query = "SELECT "
for field in fields:
query += f"{field},"
query = query[:-1]
query += " FROM prof WHERE id=?"
return self.query_db(query, (prof_id,), one=True)[0]
def getProfById(self, prof_id: Union[str, int]) -> Prof:
"""Get a professor based on the id
Args:
prof_id (Union[str,int]): the id of the professor
Returns:
Prof: a Prof object containing the data of the professor
"""
data = self.query_db("SELECT * FROM prof WHERE id=?", (prof_id,), one=True)
return Prof().from_tuple(data)
def getProfData(self, profname: str):
"""Get mail, telephone number and title of a professor based on the name
Args:
profname (str): name of the professor
Returns:
tuple: the mail, telephone number and title of the professor
"""
data = self.query_db(
"SELECT * FROM prof WHERE fullname=?",
(profname.replace(",", ""),),
one=True,
)
person = Prof()
return person.from_tuple(data)
def getProf(self, id) -> Prof:
"""Get a professor based on the id
Args:
id ([type]): the id of the professor
Returns:
Prof: a Prof object containing the data of the professor
"""
data = self.query_db("SELECT * FROM prof WHERE id=?", (id,), one=True)
return Prof().from_tuple(data)
def getProfs(self) -> list[Prof]:
"""Return all the professors in the database
Returns:
list[tuple]: a list containing all the professors in individual tuples
tuple: (id, titel, fname, lname, fullname, mail, telnr)
"""
profs = self.query_db("SELECT * FROM prof")
return [Prof().from_tuple(prof) for prof in profs]
# Apparat
def getAllAparats(self, deleted: int = 0) -> list[Apparat]:
"""Get all the apparats in the database
Args:
deleted (int, optional): Switch the result to use . Defaults to 0.
Returns:
list[tuple]: a list of tuples containing the apparats
"""
apparats = self.query_db(
"SELECT * FROM semesterapparat WHERE deletion_status=?", (deleted,)
)
ret: list[Apparat] = []
for apparat in apparats:
ret.append(Apparat().from_tuple(apparat))
return ret
def getApparatData(self, appnr, appname) -> ApparatData:
"""Get the Apparat data based on the apparat number and the name
Args:
appnr (str): the apparat number
appname (str): the name of the apparat
Raises:
NoResultError: an error is raised if no result is found
Returns:
ApparatData: the appended data of the apparat wrapped in an ApparatData object
"""
result = self.query_db(
"SELECT * FROM semesterapparat WHERE appnr=? AND name=?",
(appnr, appname),
one=True,
)
if result is None:
raise NoResultError("No result found")
apparat = ApparatData()
apparat.apparat.id = result[0]
apparat.apparat.name = result[1]
apparat.apparat.appnr = result[4]
apparat.apparat.eternal = True if result[7] == 1 else False
apparat.prof = self.getProfData(self.getProfNameById(result[2]))
apparat.prof.fullname = self.getProfNameById(result[2])
apparat.apparat.prof_id = result[2]
apparat.apparat.subject = result[3]
apparat.apparat.created_semester = result[5]
apparat.apparat.extend_until = result[8]
apparat.apparat.deleted = result[9]
apparat.apparat.apparat_id_adis = result[11]
apparat.apparat.prof_id_adis = result[12]
apparat.apparat.konto = result[13]
return apparat
def getUnavailableApparatNumbers(self) -> List[int]:
"""Get a list of all the apparat numbers in the database that are currently in use
Returns:
List[int]: the list of used apparat numbers
"""
numbers = self.query_db(
"SELECT appnr FROM semesterapparat WHERE deletion_status=0"
)
numbers = [i[0] for i in numbers]
numbers.sort()
log.info(f"Currently used apparat numbers: {numbers}")
return numbers
def setNewSemesterDate(self, app_id: Union[str, int], newDate, dauerapp=False):
"""Set the new semester date for an apparat
Args:
app_id (Union[str,int]): the id of the apparat
newDate (str): the new date
dauerapp (bool, optional): if the apparat was changed to dauerapparat. Defaults to False.
"""
# today as yyyy-mm-dd
today = datetime.datetime.now().strftime("%Y-%m-%d")
if dauerapp:
self.query_db(
"UPDATE semesterapparat SET verlängerung_bis=?, dauer=?, verlängert_am=? WHERE appnr=?",
(newDate, dauerapp, today, app_id),
)
else:
self.query_db(
"UPDATE semesterapparat SET verlängerung_bis=?, verlängert_am=? WHERE appnr=?",
(newDate, today, app_id),
)
def getId(self, apparat_name) -> Optional[int]:
"""get the id of an apparat based on the name
Args:
apparat_name (str): the name of the apparat e.g. "Semesterapparat 1"
Returns:
Optional[int]: the id of the apparat, if the apparat is not found, None is returned
"""
data = self.query_db(
"SELECT id FROM semesterapparat WHERE name=?", (apparat_name,), one=True
)
if data is None:
return None
else:
return data[0]
def getApparatId(self, apparat_name) -> Optional[int]:
"""get the id of an apparat based on the name
Args:
apparat_name (str): the name of the apparat e.g. "Semesterapparat 1"
Returns:
Optional[int]: the id of the apparat, if the apparat is not found, None is returned
"""
data = self.query_db(
"SELECT appnr FROM semesterapparat WHERE name=?", (apparat_name,), one=True
)
if data is None:
return None
else:
return data[0]
def createApparat(self, apparat: ApparatData) -> int:
"""create the apparat in the database
Args:
apparat (ApparatData): the wrapped metadata of the apparat
Raises:
AppPresentError: an error describing that the apparats chosen id is already present in the database
Returns:
Optional[int]: the id of the apparat
"""
log.debug(apparat)
app = apparat.apparat
prof = apparat.prof
present_prof = self.getProfByName(prof.name())
prof_id = present_prof.id
log.debug(present_prof)
app_id = self.getApparatId(app.name)
if app_id:
return AppPresentError(app_id)
if not prof_id:
log.debug("prof id not present, creating prof with data", prof)
prof_id = self.createProf(prof)
log.debug(prof_id)
query = f"INSERT OR IGNORE INTO semesterapparat (appnr, name, erstellsemester, dauer, prof_id, fach,deletion_status,konto) VALUES ('{app.appnr}', '{app.name}', '{app.created_semester}', '{app.eternal}', {prof_id}, '{app.subject}', '{0}', '{SEMAP_MEDIA_ACCOUNTS[app.appnr]}')"
log.debug(query)
self.query_db(query)
return None
def getApparatsByProf(self, prof_id: Union[str, int]) -> list[Apparat]:
"""Get all apparats based on the professor id
Args:
prof_id (Union[str,int]): the id of the professor
Returns:
list[tuple]: a list of tuples containing the apparats
"""
data = self.query_db(
"SELECT * FROM semesterapparat WHERE prof_id=?", (prof_id,)
)
ret = []
for i in data:
log.debug(i)
ret.append(Apparat().from_tuple(i))
return ret
def getApparatsBySemester(self, semester: str) -> dict[list]:
"""get all apparats based on the semester
Args:
semester (str): the selected semester
Returns:
dict[list]: a list off all created and deleted apparats for the selected semester
"""
data = self.query_db(
"SELECT name, prof_id FROM semesterapparat WHERE erstellsemester=?",
(semester,),
)
conn = self.connect()
cursor = conn.cursor()
c_tmp = []
for i in data:
c_tmp.append((i[0], self.getProfNameById(i[1])))
query = (
f"SELECT name,prof_id FROM semesterapparat WHERE deleted_date='{semester}'"
)
result = cursor.execute(query).fetchall()
d_tmp = []
for i in result:
d_tmp.append((i[0], self.getProfNameById(i[1])))
# group the apparats by prof
c_ret = {}
for i in c_tmp:
if i[1] not in c_ret.keys():
c_ret[i[1]] = [i[0]]
else:
c_ret[i[1]].append(i[0])
d_ret = {}
for i in d_tmp:
if i[1] not in d_ret.keys():
d_ret[i[1]] = [i[0]]
else:
d_ret[i[1]].append(i[0])
self.close_connection(conn)
return {"created": c_ret, "deleted": d_ret}
def getApparatCountBySemester(self) -> tuple[list[str], list[int]]:
"""get a list of all apparats created and deleted by semester
Returns:
tuple[list[str],list[int]]: a tuple containing two lists, the first list contains the semesters, the second list contains the amount of apparats created and deleted for the corresponding semester
"""
conn = self.connect()
cursor = conn.cursor()
semesters = self.getSemesters()
created = []
deleted = []
for semester in semesters:
query = f"SELECT COUNT(*) FROM semesterapparat WHERE erstellsemester='{semester}'"
result = cursor.execute(query).fetchone()
created.append(result[0])
query = f"SELECT COUNT(*) FROM semesterapparat WHERE deletion_status=1 AND deleted_date='{semester}'"
result = cursor.execute(query).fetchone()
deleted.append(result[0])
# store data in a tuple
ret = []
for sem in semesters:
e_tuple = (
sem,
created[semesters.index(sem)],
deleted[semesters.index(sem)],
)
ret.append(e_tuple)
self.close_connection(conn)
return ret
def deleteApparat(self, apparat: Apparat, semester: str):
"""Delete an apparat from the database
Args:
apparat: (Apparat): the apparat to be deleted
semester (str): the semester the apparat should be deleted from
"""
apparat_nr = apparat.appnr
app_id = self.getId(apparat.name)
self.query_db(
"UPDATE semesterapparat SET deletion_status=1, deleted_date=? WHERE appnr=? AND name=?",
(semester, apparat_nr, apparat.name),
)
# delete all books associated with the app_id
# print(apparat_nr, app_id)
self.query_db("UPDATE media SET deleted=1 WHERE app_id=?", (app_id,))
def isEternal(self, id):
"""check if the apparat is eternal (dauerapparat)
Args:
id (int): the id of the apparat to be checked
Returns:
int: the state of the apparat
"""
return self.query_db(
"SELECT dauer FROM semesterapparat WHERE appnr=?", (id,), one=True
)
def getApparatName(self, app_id: Union[str, int], prof_id: Union[str, int]):
"""get the name of the apparat based on the id
Args:
app_id (Union[str,int]): the id of the apparat
prof_id (Union[str,int]): the id of the professor
Returns:
str: the name of the apparat
"""
result = self.query_db(
"SELECT name FROM semesterapparat WHERE appnr=? AND prof_id=?",
(app_id, prof_id),
one=True,
)
if result:
return result[0]
return None
def updateApparat(self, apparat_data: ApparatData):
"""Update an apparat in the database
Args:
apparat_data (ApparatData): the new metadata of the apparat
"""
query = "UPDATE semesterapparat SET name = ?, fach = ?, dauer = ?, prof_id = ?, prof_id_adis = ?, apparat_id_adis = ? WHERE appnr = ?"
params = (
apparat_data.apparat.name,
apparat_data.apparat.subject,
apparat_data.apparat.eternal,
self.getProfData(apparat_data.prof.fullname).id,
apparat_data.apparat.prof_id_adis,
apparat_data.apparat.apparat_id_adis,
apparat_data.apparat.appnr,
)
log.debug(f"Updating apparat with query {query} and params {params}")
self.query_db(query, params)
def checkApparatExists(self, app_name: str):
"""check if the apparat is already present in the database based on the name
Args:
apparat_name (str): the name of the apparat
Returns:
bool: True if the apparat is present, False if not
"""
return (
True
if self.query_db(
"SELECT appnr FROM semesterapparat WHERE name=?",
(app_name,),
one=True,
)
else False
)
def checkApparatExistsByNr(self, app_nr: Union[str, int]) -> bool:
"""a check to see if the apparat is already present in the database, based on the nr. This query will exclude deleted apparats
Args:
app_nr (Union[str, int]): the id of the apparat
Returns:
bool: True if the apparat is present, False if not
"""
return (
True
if self.query_db(
"SELECT id FROM semesterapparat WHERE appnr=? and deletion_status=0",
(app_nr,),
one=True,
)
else False
)
# Statistics
def statistic_request(self, **kwargs: Any):
"""Take n amount of kwargs and return the result of the query"""
def __query(query):
"""execute the query and return the result
Args:
query (str): the constructed query
Returns:
list: the result of the query
"""
log.debug(f"Query: {query}")
conn = self.connect()
cursor = conn.cursor()
result = cursor.execute(query).fetchall()
for result_a in result:
orig_value = result_a
prof_name = self.getProfNameById(result_a[2])
# replace the prof_id with the prof_name
result_a = list(result_a)
result_a[2] = prof_name
result_a = tuple(result_a)
result[result.index(orig_value)] = result_a
self.close_connection(conn)
log.debug(f"Query result: {result}")
return result
if "deletable" in kwargs.keys():
query = f"""SELECT * FROM semesterapparat
WHERE deletion_status=0 AND dauer=0 AND
(
(erstellsemester!='{kwargs["deletesemester"]}' AND verlängerung_bis IS NULL) OR
(erstellsemester!='{kwargs["deletesemester"]}' AND verlängerung_bis!='{kwargs["deletesemester"]}' AND verlängerung_bis!='{Semester().next}')
)"""
return __query(query)
if "dauer" in kwargs.keys():
kwargs["dauer"] = kwargs["dauer"].replace("Ja", "1").replace("Nein", "0")
query = "SELECT * FROM semesterapparat WHERE "
for key, value in kwargs.items() if kwargs.items() is not None else {}:
# log.debug(key, value)
query += f"{key}='{value}' AND "
# log.debug(query)
# remove deletesemester part from normal query, as this will be added to the database upon deleting the apparat
if "deletesemester" in kwargs.keys():
query = query.replace(
f"deletesemester='{kwargs['deletesemester']}' AND ", ""
)
if "endsemester" in kwargs.keys():
if "erstellsemester" in kwargs.keys():
query = query.replace(f"endsemester='{kwargs['endsemester']}' AND ", "")
query = query.replace(
f"erstellsemester='{kwargs['erstellsemester']} AND ", "xyz"
)
else:
query = query.replace(
f"endsemester='{kwargs['endsemester']}' AND ", "xyz"
)
# log.debug("replaced")
query = query.replace(
"xyz",
f"(erstellsemester='{kwargs['endsemester']}' OR verlängerung_bis='{kwargs['endsemester']}') AND ",
)
# remove all x="" parts from the query where x is a key in kwargs
log.info(f"Query before: {query}")
query = query.strip()
query = query[:-4]
log.info(f"Query after: {query}")
# check if query ends with lowercase letter or a '. if not, remove last symbol and try again
while query[-1] not in ascii_lowercase and query[-1] != "'":
query = query[:-1]
query = query.strip()
# log.debug(query)
res = __query(query)
# log.debug(res)
return res
# Admin data
def getUser(self):
"""Get a single user from the database"""
return self.query_db("SELECT * FROM user", one=True)
def getUsers(self) -> list[tuple]:
"""Return a list of tuples of all the users in the database"""
return self.query_db("SELECT * FROM user")
def login(self, user, hashed_password):
"""try to login the user.
The salt for the user will be requested from the database and then added to the hashed password. The password will then be compared to the password in the database
Args:
user (str): username that tries to login
hashed_password (str): the password the user tries to login with
Returns:
bool: True if the login was successful, False if not
"""
try:
salt = self.query_db(
"SELECT salt FROM user WHERE username=?", (user,), one=True
)[0]
if salt is None:
return False
except TypeError:
return False
hashed_password = salt + hashed_password
password = self.query_db(
"SELECT password FROM user WHERE username=?", (user,), one=True
)[0]
if password == hashed_password:
return True
else:
return False
def changePassword(self, user, new_password):
"""change the password of a user.
The password will be added with the salt and then committed to the database
Args:
user (str): username
new_password (str): the hashed password
"""
salt = self.query_db(
"SELECT salt FROM user WHERE username=?", (user,), one=True
)[0]
new_password = salt + new_password
self.query_db(
"UPDATE user SET password=? WHERE username=?", (new_password, user)
)
def getRole(self, user: str) -> str:
"""get the role of the user
Args:
user (str): username
Returns:
str: the name of the role
"""
return self.query_db(
"SELECT role FROM user WHERE username=?", (user,), one=True
)[0]
def getRoles(self) -> list[tuple]:
"""get all the roles in the database
Returns:
list[str]: a list of all the roles
"""
roles = self.query_db("SELECT role FROM user")
return [i[0] for i in roles]
def checkUsername(self, user) -> bool:
"""a check to see if the username is already present in the database
Args:
user (str): the username
Returns:
bool: True if the username is present, False if not
"""
data = self.query_db(
"SELECT username FROM user WHERE username=?", (user,), one=True
)
return True if data is not None else False
def createUser(self, user, password, role, salt):
"""create an user from the AdminCommands class.
Args:
user (str): the username of the user
password (str): a hashed password
role (str): the role of the user
salt (str): a salt for the password
"""
self.query_db(
"INSERT OR IGNORE INTO user (username, password, role, salt) VALUES (?,?,?,?)",
(user, password, role, salt),
)
# check if user was created
return (
self.query_db(
"SELECT username FROM user WHERE username=?", (user,), one=True
)
is not None
)
def deleteUser(self, user):
"""delete an unser
Args:
user (str): username of the user
"""
self.query_db("DELETE FROM user WHERE username=?", (user,))
def updateUser(self, username, data: dict[str, str]):
"""changge the data of a user
Args:
username (str): the username of the user
data (dict[str, str]): the data to be changed
"""
conn = self.connect()
cursor = conn.cursor()
query = "UPDATE user SET "
for key, value in data.items():
if key == "username":
continue
query += f"{key}='{value}',"
query = query[:-1]
query += " WHERE username=?"
params = (username,)
cursor.execute(query, params)
conn.commit()
self.close_connection(conn)
def getFacultyMember(self, name: str) -> tuple:
"""get a faculty member based on the name
Args:
name (str): the name to be searched for
Returns:
tuple: a tuple containing the data of the faculty member
"""
return self.query_db(
"SELECT titel, fname,lname,mail,telnr,fullname FROM prof WHERE fullname=?",
(name,),
one=True,
)
def updateFacultyMember(self, data: dict, oldlname: str, oldfname: str):
"""update the data of a faculty member
Args:
data (dict): a dictionary containing the data to be updated
oldlname (str): the old last name of the faculty member
oldfname (str): the old first name of the faculty member
"""
placeholders = ", ".join([f"{i}=:{i} " for i in data.keys()])
query = f"UPDATE prof SET {placeholders} WHERE lname = :oldlname AND fname = :oldfname"
data["oldlname"] = oldlname
data["oldfname"] = oldfname
self.query_db(query, data)
def getFacultyMembers(self):
"""get a list of all faculty members
Returns:
list[tuple]: a list of tuples containing the faculty members
"""
return self.query_db("SELECT titel, fname,lname,mail,telnr,fullname FROM prof")
def restoreApparat(self, app_id: Union[str, int], app_name: str):
"""restore an apparat from the database
Args:
app_id (Union[str, int]): the id of the apparat
"""
return self.query_db(
"UPDATE semesterapparat SET deletion_status=0, deleted_date=NULL WHERE appnr=? and name=?",
(app_id, app_name),
)
# ELSA
def createElsaApparat(self, date, prof_id, semester) -> int:
"""create a new apparat in the database for the ELSA system
Args:
date (str): the name of the apparat
prof_id (int): the id of the professor
semester (str): the semester the apparat is created in
Returns:
int: the id of the apparat
"""
self.query_db(
"INSERT OR IGNORE INTO elsa (date, prof_id, semester) VALUES (?,?,?)",
(date, prof_id, semester),
)
# get the id of the apparat
apparat_id = self.query_db(
"SELECT id FROM elsa WHERE date=? AND prof_id=? AND semester=?",
(date, prof_id, semester),
one=True,
)[0]
return apparat_id
def updateElsaApparat(self, date, prof_id, semester, elsa_id):
"""update an ELSA apparat in the database
Args:
date (str): the name of the apparat
prof_id (int): the id of the professor
semester (str): the semester the apparat is created in
elsa_id (int): the id of the ELSA apparat
"""
self.query_db(
"UPDATE elsa SET date=?, prof_id=?, semester=? WHERE id=?",
(date, prof_id, semester, elsa_id),
)
def addElsaMedia(self, data: dict, elsa_id: int):
"""add a media to the ELSA system
Args:
data (dict): a dictionary containing the data of the media,
elsa_id (int): the id of the ELSA apparat
"""
headers = []
entries = []
for key, value in data.items():
headers.append(key)
entries.append(value)
headers.append("elsa_id")
entries.append(elsa_id)
query = f"INSERT INTO elsa_media ({', '.join(headers)}) VALUES ({', '.join(['?' for i in range(len(headers))])})"
self.query_db(query, entries)
def getElsaMedia(self, elsa_id: int):
"""get all the media of an ELSA apparat
Args:
elsa_id (int): the id of the ELSA apparat
Returns:
list[tuple]: a list of tuples containing the media
"""
media = self.query_db("SELECT * FROM elsa_media WHERE elsa_id=?", (elsa_id,))
# convert the media to a list of dictionaries
ret = []
table_fields = self.query_db("PRAGMA table_info(elsa_media)")
for m in media:
tmp = {}
for i in range(len(m)):
tmp[table_fields[i][1]] = m[i]
ret.append(tmp)
return ret
def insertElsaFile(self, file: list[dict], elsa_id: int):
"""Instert a list of files into the ELSA system
Args:
file (list[dict]): a list containing all the files to be inserted
Structured: [{"name": "filename", "path": "path", "type": "filetype"}]
elsa_id (int): the id of the ELSA apparat
"""
for f in file:
filename = f["name"]
path = f["path"]
filetyp = f["type"]
if path == "Database":
continue
blob = create_blob(path)
query = "INSERT OR IGNORE INTO elsa_files (filename, fileblob, elsa_id, filetyp) VALUES (?, ?, ?, ?)"
self.query_db(query, (filename, blob, elsa_id, filetyp))
def recreateElsaFile(self, filename: str, filetype: str) -> str:
"""Recreate a file from the ELSA system
Args:
filename (str): the name of the file
elsa_id (int): the id of the ELSA apparat
filetype (str): the extension of the file to be created
Returns:
str: The filename of the recreated file
"""
blob = self.query_db(
"SELECT fileblob FROM elsa_files WHERE filename=?", (filename,), one=True
)[0]
# log.debug(blob)
tempdir = settings.database.temp.expanduser()
if not tempdir.exists():
tempdir.mkdir(parents=True, exist_ok=True)
file = tempfile.NamedTemporaryFile(
delete=False, dir=tempdir, mode="wb", suffix=f".{filetype}"
)
file.write(blob)
# log.debug("file created")
return file.name
def getElsaApparats(self) -> ELSA:
"""Get all the ELSA apparats in the database
Returns:
list[tuple]: a list of tuples containing the ELSA apparats
"""
return self.query_db(
"SELECT * FROM elsa ORDER BY substr(date, 7, 4) || '-' || substr(date, 4, 2) || '-' || substr(date, 1, 2)"
)
def getElsaId(self, prof_id: int, semester: str, date: str) -> int:
"""get the id of an ELSA apparat based on the professor, semester and date
Args:
prof_id (int): the id of the professor
semester (str): the semester
date (str): the date of the apparat
Returns:
int: the id of the ELSA apparat
"""
data = self.query_db(
"SELECT id FROM elsa WHERE prof_id=? AND semester=? AND date=?",
(prof_id, semester, date),
one=True,
)
if data is None:
return None
return data[0]
def getElsaFiles(self, elsa_id: int):
"""get all the files of an ELSA apparat
Args:
elsa_id (int): the id of the ELSA apparat
Returns:
list[tuple]: a list of tuples containing the files
"""
return self.query_db(
"SELECT filename, filetyp FROM elsa_files WHERE elsa_id=?", (elsa_id,)
)
###
def createProf(self, profdata: Prof):
log.debug(profdata)
conn = self.connect()
cursor = conn.cursor()
fname = profdata.firstname
lname = profdata.lastname
fullname = f"{lname} {fname}"
mail = profdata.mail
telnr = profdata.telnr
title = profdata.title
query = "INSERT INTO prof (fname, lname, fullname, mail, telnr, titel) VALUES (?,?,?,?,?,?)"
log.debug(query)
cursor.execute(query, (fname, lname, fullname, mail, telnr, title))
conn.commit()
conn.close()
return self.getProfId(profdata)
def getElsaProfId(self, profname):
query = f"SELECT id FROM elsa_prof WHERE fullname = '{profname}'"
data = self.query_db(query)
if data:
return data[0][0]
else:
return None
def getElsaProfs(self) -> list[str]:
query = "SELECT fullname FROM elsa_prof"
data = self.query_db(query)
if data:
return [i[0] for i in data]
else:
return []
def getProfId(self, profdata: dict[str, Any] | Prof):
"""Get the prof ID based on the profdata
Args:
profdata (dict | Prof): either a dictionary containing the prof data or a Prof object
Returns:
int | None: The id of the prof or None if not found
"""
conn = self.connect()
cursor = conn.cursor()
if isinstance(profdata, dict):
name = profdata["profname"]
if "," in name:
fname = name.split(", ")[1].strip()
lname = name.split(", ")[0].strip()
fullname = f"{lname} {fname}"
else:
fullname = profdata["profname"]
else:
fullname = profdata.name()
query = "SELECT id FROM prof WHERE fullname = ?"
log.debug(query)
cursor.execute(query, (fullname,))
result = cursor.fetchone()
if result:
return result[0]
else:
return None
def getProfByName(self, fullname):
"""Get all Data of the prof based on fullname
Args:
fullname (str): The full name of the prof
"""
conn = self.connect()
cursor = conn.cursor()
query = "SELECT * FROM prof WHERE fullname = ?"
log.debug(query)
result = cursor.execute(query, (fullname,)).fetchone()
if result:
return Prof().from_tuple(result)
else:
return Prof()
def getProfIDByApparat(self, apprarat_id: int) -> Optional[int]:
"""Get the prof id based on the semesterapparat id from the database
Args:
apprarat_id (int): Number of the apparat
Returns:
int | None: The id of the prof or None if not found
"""
query = "SELECT prof_id from semesterapparat WHERE appnr = ? and deletion_status = 0"
data = self.query_db(query, (apprarat_id,))
if data:
log.info("Prof ID: " + str(data[0][0]))
return data[0][0]
else:
return None
def copyBookToApparat(self, book_id: int, apparat: int):
# get book data
new_apparat_id = apparat
new_prof_id = self.getProfIDByApparat(new_apparat_id)
query = (
"INSERT INTO media (bookdata, app_id, prof_id, deleted, available, reservation) "
"SELECT bookdata, ?, ?, 0, available, reservation FROM media WHERE id = ?"
)
connection = self.connect()
cursor = connection.cursor()
cursor.execute(query, (new_apparat_id, new_prof_id, book_id))
connection.commit()
connection.close()
def moveBookToApparat(self, book_id: int, appratat: int):
"""Move the book to the new apparat
Args:
book_id (int): the ID of the book
appratat (int): the ID of the new apparat
"""
# get book data
query = "UPDATE media SET app_id = ? WHERE id = ?"
connection = self.connect()
cursor = connection.cursor()
cursor.execute(query, (appratat, book_id))
connection.commit()
connection.close()
def getApparatNameByAppNr(self, appnr: int):
query = (
"SELECT name FROM semesterapparat WHERE appnr = ? and deletion_status = 0"
)
data = self.query_db(query, (appnr,))
if data:
return data[0][0]
else:
return None
def fetch_one(self, query: str, args: tuple[Any, ...] = ()) -> tuple[Any, ...]:
connection = self.connect()
cursor = connection.cursor()
cursor.execute(query, args)
result = cursor.fetchone()
connection.close()
return result
def getBookIdByPPN(self, ppn: str) -> int:
query = "SELECT id FROM media WHERE bookdata LIKE ?"
data = self.query_db(query, (f"%{ppn}%",))
if data:
return data[0][0]
else:
return None
def getNewEditionsByApparat(self, apparat_id: int) -> list[BookData]:
"""Get all new editions for a specific apparat
Args:
apparat_id (int): the id of the apparat
Returns:
list[tuple]: A list of tuples containing the new editions data
"""
query = "SELECT * FROM neweditions WHERE for_apparat=? AND ordered=0"
results = self.query_db(query, (apparat_id,))
res = []
for result in results:
# keep only new edition payload; old edition can be reconstructed if needed
res.append(BookData().from_string(result[1]))
return res
def setOrdered(self, newBook_id: int):
query = "UPDATE neweditions SET ordered=1 WHERE id=?"
self.query_db(query, (newBook_id,))
def getBooksWithNewEditions(self, app_id) -> List[BookData]:
# select all bookdata from media, based on the old_edition_id in neweditions where for_apparat = app_id; also get the new_edition bookdata
query = "SELECT m.bookdata, new_bookdata FROM media m JOIN neweditions n ON m.id = n.old_edition_id WHERE n.for_apparat = ?"
results = self.query_db(query, (app_id,))
# store results in tuple old,new
res = []
for result in results:
oldedition = BookData().from_string(result[0])
newedition = BookData().from_string(result[1])
res.append((oldedition, newedition))
return res
def getNewEditionId(self, newBook: BookData):
query = "SELECT id FROM neweditions WHERE new_bookdata LIKE ?"
args = (
newBook.isbn[0] if newBook.isbn and len(newBook.isbn) > 0 else newBook.ppn
)
params = (f"%{args}%",)
data = self.query_db(query, params, one=True)
if data:
return data[0]
else:
return None
def insertNewEdition(self, newBook: BookData, oldBookId: int, for_apparat: int):
# check if new edition already in table, check based on newBook.ppn
check_query = "SELECT id FROM neweditions WHERE new_bookdata LIKE ?"
check_params = (f"%{newBook.ppn}%",)
data = self.query_db(check_query, check_params, one=True)
if data:
log.info("New edition already in table, skipping insert")
return
query = "INSERT INTO neweditions (new_bookdata, old_edition_id, for_apparat) VALUES (?,?,?)"
params = (newBook.to_dict, oldBookId, for_apparat)
self.query_db(query, params)