implement logic to use backup folder if origin is unreachable and once reachable, move data to origin

This commit is contained in:
WorldTeacher
2024-08-02 08:42:52 +02:00
parent 5e706022bc
commit 0fb525ad4e
2 changed files with 137 additions and 21 deletions

View File

@@ -9,6 +9,8 @@ class Backup:
self.source_path = config.database.path + "/" + config.database.name self.source_path = config.database.path + "/" + config.database.name
self.backup_path = config.database.backupLocation + "/" + config.database.name self.backup_path = config.database.backupLocation + "/" + config.database.name
self.backup = False self.backup = False
if not os.path.exists(config.database.backupLocation):
os.makedirs(config.database.backupLocation)
if config.database.do_backup == True: if config.database.do_backup == True:
self.checkpaths() self.checkpaths()
config.database.do_backup = self.backup config.database.do_backup = self.backup

View File

@@ -1,28 +1,102 @@
import sqlite3 as sql import sqlite3 as sql
import os import os
import shutil
from src import config from src import config
from pathlib import Path
from src.schemas import USERS, MEDIA, LOANS, User, Book, Loan from src.schemas import USERS, MEDIA, LOANS, User, Book, Loan
from src.utils import stringToDate, Log from src.utils import stringToDate, Log, debugMessage as dbg
from PyQt6 import QtCore
log = Log("Database") log = Log("Database")
FILE = config.database.name
class Database: class Database:
def __init__(self, db_path: str = None): def __init__(self, db_path: str = None):
""" '''
Default constructor for the database class Default constructor for the database class
Args: Args:
db_path (str, optional): Optional Path for testing / specific purposes. Defaults to None. db_path (str, optional): Optional Path for testing / specific purposes. Defaults to None.
""" '''
if db_path is None: if db_path is None:
self.db_path = config.database.path + "/" + config.database.name self.db_path = self.handle_folder_reachability(config.database.path, config.database.backupLocation)
else: else:
self.db_path = db_path self.db_path = db_path
if not os.path.exists(config.database.path): if not os.path.exists(config.database.path):
os.makedirs(config.database.path) try:
os.makedirs(config.database.path)
except FileNotFoundError:
print(self.db_path)
if not os.path.exists(config.database.backupLocation):
os.makedirs(config.database.backupLocation)
#if main path does not exist, try to create it. if that fails, use the backuplocation
print(self.db_path)
self.checkDatabaseStatus() self.checkDatabaseStatus()
def handle_folder_reachability(self, original_path, backup_path):
"""
Checks if the original folder is reachable. If not, creates a backup.
If the original folder becomes reachable again, restores the backup.
Args:
original_path (str): Path to the original folder.
backup_path (str): Path to the backup folder.
Returns:
str: Path to the current accessible folder.
"""
backup_file = os.path.join(backup_path, ".backup")
if not os.path.exists(original_path):
#original folder not reachable, use backup path and create .backup file
if not os.path.exists(backup_path):
os.makedirs(backup_path)
with open(backup_file, "w") as f:
f.write("")
# Create an empty backup file as a marker
return backup_path +"/" + FILE
else:
print("Original Path Exists, ")
# Original folder is reachable, check for backup
if os.path.exists(backup_file):
# Restore backup
shutil.rmtree(original_path) # Remove original folder to avoid conflicts
os.rename(backup_path, original_path)
# (backup_path, original_path)
#os.remove(backup_file)
#remove backup file from original path
os.remove(original_path + "/.backup")
os.makedirs(backup_path)
return original_path +"/" + FILE
def checkDatabasePath(self):
self.db_path = config.database.path + "/" + config.database.name
#if backup file in backup location, move database to main location, delete backup file
if os.path.exists(config.database.backupLocation + "/backup"):
if os.path.exists(self.db_path):
os.remove(self.db_path)
os.rename(f"{config.database.backupLocation}/{config.database.name}", self.db_path)
#remove backup file
os.remove(config.database.backupLocation + "/backup")
return self.db_path
else:
#keep using backup file
self.db_path = config.database.backupLocation + "/" + config.database.name
if not os.path.exists(config.database.path):
try:
os.makedirs(config.database.path)
except:
self.db_path = config.database.backupLocation + "/" + config.database.name
if not os.path.exists(config.database.backupLocation):
os.makedirs(config.database.backupLocation)
#create a backup file in the backup location
with open(f"{config.database.backupLocation}/backup.txt", "w") as f:
f.write("Backup File")
return self.db_path
def checkDatabaseStatus(self): def checkDatabaseStatus(self):
log.info("Checking Database Status") log.info("Checking Database Status")
if self.tableCheck() == []: if self.tableCheck() == []:
@@ -31,29 +105,29 @@ class Database:
# self.insertSubjects() # self.insertSubjects()
def connect(self) -> sql.Connection: def connect(self) -> sql.Connection:
""" '''
Connect to the database Connect to the database
Returns: Returns:
sql.Connection: The active connection to the database sql.Connection: The active connection to the database
""" '''
return sql.connect(self.db_path) return sql.connect(self.db_path)
def close_connection(self, conn: sql.Connection): def close_connection(self, conn: sql.Connection):
""" '''
closes the connection to the database closes the connection to the database
Args: Args:
---- ----
- conn (sql.Connection): the connection to be closed - conn (sql.Connection): the connection to be closed
""" '''
conn.close() conn.close()
def createDatabase(self): def createDatabase(self):
log.info("Creating Database") log.info("Creating Database")
# print("Creating Database") # print("Creating Database")
if not os.path.exists(config.database.path): if not os.path.exists(self.db_path):
os.makedirs(config.database.path) os.makedirs(self.db_path)
conn = self.connect() conn = self.connect()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(USERS) cursor.execute(USERS)
@@ -76,12 +150,12 @@ class Database:
def tableCheck(self): def tableCheck(self):
# check if database has tables # check if database has tables
""" '''
Get the contents of the Get the contents of the
Returns: Returns:
Union[List[Tuple], None]: _description_ Union[List[Tuple], None]: Returns a list of tuples containing the table names or None if no tables are present
""" '''
try: try:
with sql.connect(self.db_path) as conn: with sql.connect(self.db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
@@ -114,11 +188,15 @@ class Database:
log.debug(f"Inserting User {userno}, {username}, {usermail}") log.debug(f"Inserting User {userno}, {username}, {usermail}")
conn = self.connect() conn = self.connect()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute( try:
f"INSERT INTO users (id,username, usermail) VALUES ('{userno}', '{username}', '{usermail}' )" cursor.execute(
) f"INSERT INTO users (id,username, usermail) VALUES ('{userno}', '{username}', '{usermail}' )"
conn.commit() )
conn.commit()
except sql.IntegrityError:
return False
self.close_connection(conn) self.close_connection(conn)
return True
def getUser(self, userid) -> User: def getUser(self, userid) -> User:
conn = self.connect() conn = self.connect()
@@ -130,6 +208,16 @@ class Database:
log.info(f"Returning User {user}") log.info(f"Returning User {user}")
return user return user
def getUserId(self, username) -> User:
conn = self.connect()
cursor = conn.cursor()
cursor.execute(f"SELECT * FROM users WHERE username = '{username}'")
result = cursor.fetchone()
self.close_connection(conn)
user = User(id=result[0], username=result[1], email=result[2])
log.info(f"Returning User {user}")
return user
def updateUser(self, username, userno, usermail): def updateUser(self, username, userno, usermail):
log.debug(f"Updating User {userno}, {username}, {usermail}") log.debug(f"Updating User {userno}, {username}, {usermail}")
conn = self.connect() conn = self.connect()
@@ -138,9 +226,34 @@ class Database:
f"UPDATE users SET username = '{username}', usermail = '{usermail}' WHERE id = '{userno}'" f"UPDATE users SET username = '{username}', usermail = '{usermail}' WHERE id = '{userno}'"
) )
conn.commit() conn.commit()
self.close_connection(conn) self.close_connection(conn)
def setUserActiveDate(self, userid,date):
query = f"UPDATE users SET lastActive = '{date}' WHERE id = '{userid}'"
conn = self.connect()
cursor = conn.cursor()
cursor.execute(query)
conn.commit()
dbg(f"Setting User {userid} to active on {date}")
def renameInactiveUsers(self):
lastYear = QtCore.QDate.currentDate().addDays(int(f"-{config.inactive_user_deletion}")).toString("yyyy-MM-dd")
query = f"SELECT * FROM users WHERE lastActive < '{lastYear}'"
conn = self.connect()
cursor = conn.cursor()
result = cursor.execute(query).fetchall()
self.close_connection(conn)
for user in result:
self.updateUser("gelöscht", user[0], "gelöscht")
def deleteUser(self, userid):
log.debug(f"Deleting User {userid}")
conn = self.connect()
cursor = conn.cursor()
cursor.execute(f"UPDATE users SET username='gelöscht', usermail = 'gelöscht' WHERE id = '{userid}'")
conn.commit()
self.close_connection(conn)
def getActiveLoans(self, userid): def getActiveLoans(self, userid):
conn = self.connect() conn = self.connect()
cursor = conn.cursor() cursor = conn.cursor()
@@ -172,6 +285,7 @@ class Database:
loan[5], loan[5],
stringToDate(loan[6]), stringToDate(loan[6]),
self.getMedia(loan[2]), self.getMedia(loan[2]),
user_name=self.getUser(loan[1]).username,
) )
loan_data.append(l) loan_data.append(l)
return loan_data return loan_data