diff --git a/src/logic/database.py b/src/logic/database.py index 1bdc54c..066ddde 100644 --- a/src/logic/database.py +++ b/src/logic/database.py @@ -1,38 +1,224 @@ -import sqlite3 +import sqlite3 as sql import os from src import config from pathlib import Path -from src.logic import USERS, MEDIA, LOANS +from src.schemas import USERS, MEDIA, LOANS, User, Book + + class Database: - def __init__(self) -> None: - self.db_name = config.database.name - self.db_location = config.database.path - self.db_path = Path(self.db_location).joinpath(self.db_name) - print(self.db_path) - self.backup_path = config.database.backupLocation - try: - self.connection = sqlite3.connect(self.db_path) - except sqlite3.OperationalError as e: - print(e) + def __init__(self, db_path: str = None): + """ + Default constructor for the database class + + Args: + db_path (str, optional): Optional Path for testing / specific purposes. Defaults to None. + """ + if db_path is None: + self.db_path = config.database.path + "/" + config.database.name + else: + self.db_path = db_path + if not os.path.exists(config.database.path): + os.makedirs(config.database.path) + + self.checkDatabaseStatus() + + def checkDatabaseStatus(self): + if self.tableCheck() == []: + # self.logger.log_critical("Database does not exist, creating tables") self.createDatabase() - pass - + # self.insertSubjects() + + def connect(self) -> sql.Connection: + """ + Connect to the database + + Returns: + sql.Connection: The active connection to the database + """ + return sql.connect(self.db_path) + + def close_connection(self, conn: sql.Connection): + """ + closes the connection to the database + + Args: + ---- + - conn (sql.Connection): the connection to be closed + """ + conn.close() + def createDatabase(self): print("Creating Database") - if not os.path.exists(self.db_location): - os.makedirs(self.db_location) - self.connection = sqlite3.connect(self.db_path) - self.cursor = self.connection.cursor() - self.cursor.execute(USERS) - self.cursor.execute(MEDIA) - self.cursor.execute(LOANS) - self.connection.commit() - def tableCheck(self): - pass - - def test(self): - print("Test") - def checkUserExists(self, **kwargs): - pass + if not os.path.exists(config.database.path): + os.makedirs(config.database.path) + conn = self.connect() + cursor = conn.cursor() + cursor.execute(USERS) + cursor.execute(MEDIA) + cursor.execute(LOANS) + conn.commit() + self.close_connection(conn) - \ No newline at end of file + def tableCheck(self): + # check if database has tables + """ + Get the contents of the + + Returns: + Union[List[Tuple], None]: _description_ + """ + try: + with sql.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT * FROM sqlite_master WHERE type='table'") + return cursor.fetchall() + except sql.OperationalError: + return None + + def query(self, query): + conn = self.connect() + cursor = conn.cursor() + cursor.execute(query) + + def checkUserExists(self, key, value) -> list[User] | bool: + query = f"SELECT * FROM users WHERE {key} like '%{value}%'" + conn = self.connect() + cursor = conn.cursor() + try: + cursor.execute(query) + except sql.OperationalError: + return False + + users = cursor.fetchall() + for index, user in enumerate(users): + users[index] = User(id=user[0], username=user[1], email=user[2]) + self.close_connection(conn) + return users + + def insertUser(self, username, userno, usermail): + conn = self.connect() + cursor = conn.cursor() + cursor.execute( + f"INSERT INTO users (id,username, usermail) VALUES ('{userno}', '{username}', '{usermail}' )" + ) + conn.commit() + self.close_connection(conn) + + def getUser(self, userid) -> User: + conn = self.connect() + cursor = conn.cursor() + cursor.execute(f"SELECT * FROM users WHERE id = '{userid}'") + result = cursor.fetchone() + self.close_connection(conn) + user = User(id=result[0], username=result[1], email=result[2]) + return user + + def getActiveLoans(self, userid): + conn = self.connect() + cursor = conn.cursor() + try: + cursor.execute( + f"SELECT * FROM loans WHERE user_id = '{userid}' AND returned = 0" + ) + result = cursor.fetchall() + except sql.OperationalError: + result = [] + self.close_connection(conn) + return str(len(result)) + + def insertLoan(self, userid, mediaid, loandate, duedate): + query = f"INSERT INTO loans (user_id, media_id, loan_date, return_date) Values ('{userid}', '{mediaid}', '{loandate}', '{duedate}')" + conn = self.connect() + cursor = conn.cursor() + cursor.execute(query) + conn.commit() + self.close_connection(conn) + + def insertMedia(self, media): + query = f"INSERT OR IGNORE INTO media (ppn, title, signature, isbn) VALUES ('{media.ppn}', '{media.title}', '{media.signature}', '{media.isbn}')" # , '{media.link}' + conn = self.connect() + cursor = conn.cursor() + cursor.execute(query) + conn.commit() + self.close_connection(conn) + return cursor.lastrowid + + def getMedia(self, media_id): + query = f"SELECT * FROM media WHERE id = '{media_id}'" + conn = self.connect() + cursor = conn.cursor() + cursor.execute(query) + result = cursor.fetchone() + self.close_connection(conn) + res = Book( + database_id=result[0], + signature=result[1], + isbn=result[2], + ppn=result[3], + title=result[4], + ) + return res + + def checkMediaExists(self, media): + conn = self.connect() + cursor = conn.cursor() + query = f"SELECT id, signature FROM media WHERE ppn = '{media.ppn}' OR title = '{media.title}' OR isbn = '{media.isbn}' OR signature = '{media.signature}'" + cursor.execute(query) + result = cursor.fetchall() + self.close_connection(conn) + ress = [] + for res in result: + if res[1] == media.signature: + ress.append(res[0]) + return ress + return False + + def checkLoanState(self, book_id): + query = f"SELECT * FROM loans WHERE media_id = '{book_id}' AND returned = 0" + conn = self.connect() + cursor = conn.cursor() + cursor.execute(query) + result = cursor.fetchall() + self.close_connection(conn) + return result + + def returnMedia(self, media_id): + query = f"UPDATE loans SET returned = 1 WHERE media_id = '{media_id}'" + conn = self.connect() + cursor = conn.cursor() + cursor.execute(query) + conn.commit() + # return book + query = f"SELECT * FROM media WHERE id = '{media_id}'" + cursor.execute(query) + result = cursor.fetchone() + self.close_connection(conn) + return Book( + signature=result[1], + isbn=result[2], + ppn=result[3], + title=result[4], + database_id=result[0], + ) + + def getUserByLoan(self, book_id): + query = ( + f"SELECT user_id FROM loans WHERE media_id = '{book_id}' AND returned = 0" + ) + conn = self.connect() + cursor = conn.cursor() + cursor.execute(query) + result = cursor.fetchone() + self.close_connection(conn) + return self.getUser(result[0]) + + # + def selectClosestReturnDate(self, user_id): + query = f"SELECT return_date FROM loans WHERE user_id = '{user_id}' AND returned = 0 ORDER BY return_date ASC LIMIT 1" + conn = self.connect() + cursor = conn.cursor() + cursor.execute(query) + result = cursor.fetchone() + self.close_connection(conn) + if result is not None: + return result[0]