From 21a653d9f09f2b79d6f356a0c9a93a3fda68e818 Mon Sep 17 00:00:00 2001 From: WorldTeacher Date: Tue, 6 May 2025 20:51:36 +0200 Subject: [PATCH] add logging, typechecking add various functions --- src/logic/download.py | 60 ++++++++------ src/logic/utils.py | 181 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 194 insertions(+), 47 deletions(-) diff --git a/src/logic/download.py b/src/logic/download.py index 533fb60..4eb964f 100644 --- a/src/logic/download.py +++ b/src/logic/download.py @@ -3,16 +3,23 @@ import os import time import bencodepy from .utils import rename -from aria2p import Client +from aria2p import Client, API +from pathlib import Path +import loguru +import sys -from aria2p import API +log = loguru.logger +log.remove() +log.add("application.log", rotation="1 week", retention="1 month") +# log.add(sys.stdout, level='INFO') class Download: """Download a file from a url and start the download using aria2""" - def __init__(self, download_location) -> None: - self.download_location = download_location + def __init__(self, download_location: Path) -> None: + # self.download_location needs to be a string + self.download_location = download_location.__str__() self.filename = None self.torrent_file = None self.progress = 0 @@ -28,12 +35,12 @@ class Download: ) self.api.set_global_options({"dir": self.download_location}) if not self.aria2_running: - print("Aria2 is not running") + log.error("Aria2 is not running") sys.exit() def check_aria2(self): # check if aria2 is running - if os.system("ps -A | grep aria2c") == 0: + if os.system("ps -A | grep aria2c > /dev/null 2>&1") == 0: return True else: return False @@ -47,16 +54,20 @@ class Download: self.progress = current_progress return current_progress - def get_file(self, url, series_name=None): + def get_file(self, url): # get the file name from the url # use wget to download the file to the download location name = url.split("/")[-1] - dl_url = f"{self.download_location}{name}" - while self.get_filename(dl_url) is None: - if not os.path.exists(dl_url): - os.system(f"wget -P {self.download_location} {url}") - filename = self.get_filename(dl_url) - self.torrent_file = url.split("/")[-1] + dl_url = self.download_location + # while self.get_filename(dl_url) is None: + # if not os.path.exists(dl_url): + + # call os.system(f"wget -P {dl_url} {url}"), but suppress output + os.system(f"wget -P {dl_url} {url} > /dev/null 2>&1") + while not os.path.exists(dl_url): + time.sleep(1) + filename = self.get_filename(Path(dl_url, name)) + self.torrent_file = name self.filename = filename return filename @@ -65,12 +76,15 @@ class Download: file for file in os.listdir(self.download_location) if ".torrent" in file ] for file in tr_files: - os.remove(f"{self.download_location}{file}") + if os.path.isdir(Path(self.download_location, file)): + os.rmdir(Path(self.download_location, file)) + else: + os.remove(Path(self.download_location, file)) def add_torrent(self, torr_name): try: - self.api.add_torrent(f"{self.download_location}{torr_name}") - print("Torrent added") + self.api.add_torrent(f"{self.download_location}/{torr_name}") + log.info("Torrent added") except Exception as e: print(f"Error adding torrent: {e}") return False @@ -78,16 +92,16 @@ class Download: def rename_download(self): filename = self.filename.replace(".aria2", "") foldername = filename.replace(".cbz", "") if ".cbz" in filename else filename - print(f"Filename: {filename}") - print(f"Foldername: {foldername}") - if not os.path.exists(f"{self.download_location}{foldername}"): - os.mkdir(f"{self.download_location}{foldername}") + log.info(f"Filename: {filename}") + log.info(f"Foldername: {foldername}") + if not os.path.exists(Path(self.download_location, foldername)): + os.mkdir(Path(self.download_location, foldername)) os.rename( - f"{self.download_location}{filename}", - f"{self.download_location}{foldername}/{filename}", + Path(self.download_location, filename), + Path(self.download_location, foldername, filename), ) # rename the file - rename(f"{self.download_location}{foldername}") + rename(Path(self.download_location, foldername)) def get_filename(self, torrent_file): try: diff --git a/src/logic/utils.py b/src/logic/utils.py index 6b2f648..6f5a354 100644 --- a/src/logic/utils.py +++ b/src/logic/utils.py @@ -4,11 +4,24 @@ from komconfig import KomConfig from pathlib import Path import shutil import subprocess +import jaro +import loguru +import sys +import time +from komgapi import komgapi +from komcache import KomCache +cfg = KomConfig() + +log = loguru.logger +log.remove() +log.add("logs/utils.log", rotation="1 week", retention="1 month") +log.add(sys.stdout, level="INFO") config = KomConfig() +komga = komgapi(cfg.komga.user, cfg.komga.password, cfg.komga.url) -def rename(folder: str = "/home/alexander/Downloads/torrents/manga/") -> None: +def rename(folder: Path = config.komgrabber.download_location) -> None: """Rename the files in a folder according to the template. Template: [Name] v[nr] #[nr].ext (e.g. "The Flash v1 #1.cbz"). @@ -17,15 +30,12 @@ def rename(folder: str = "/home/alexander/Downloads/torrents/manga/") -> None: - folder (str): the string to the folder """ # Get the files in the folder - if "~" in folder: - folder = os.path.expanduser(folder) files = os.listdir(folder) - print(files) for file in files: if os.path.isdir(f"{folder}/{file}"): - rename(f"{folder}/{file}") + rename(Path(f"{folder}/{file}")) if not file.endswith(".cbz"): - print(f"Skipping {file}, not a cbz file") + log.debug(f"Skipping {file}, not a cbz file") continue ext = file.split(".")[-1] @@ -39,7 +49,6 @@ def rename(folder: str = "/home/alexander/Downloads/torrents/manga/") -> None: title = file[:split_start].strip() # add the volume number to the title as a suffix #nr title = f"{title} {volume} #{volume.replace('v', '')}" - print(title) # rename the file os.rename(f"{folder}/{file}", f"{folder}/{title}.{ext}") @@ -49,10 +58,10 @@ def rename_recursive(folder: str) -> None: # get all directories in the folder and apply the rename function to them for root, dirs, _files in os.walk(folder): for dir in dirs: - rename(f"{root}/{dir}") + rename(Path(f"{root}/{dir}")) -def tag_folder(folder: Path = Path(config.komgrabber.download_location)) -> None: +def tag_folder(folder: Path = config.komgrabber.download_location) -> None: """ Recursively tags all the .cbz files in the folder using ComicTagger @@ -70,39 +79,45 @@ def tag_folder(folder: Path = Path(config.komgrabber.download_location)) -> None tag_folder(f"{folder}/{file}") if not file.endswith(".cbz"): continue - print("Trying to tag file", file) + log.info(f"Trying to tag file {file}") subprocess.call( f'comictagger -s -t cr -f -o "{folder}/{file}" --nosummary --overwrite {"-i" if config.komgrabber.tag_interactive else ""}', shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, ) -def move(src, dest: Path = Path(config.komga.media_path)) -> None: +def move(src: Path, dest: Path = Path(config.komga.media_path)) -> None: """ Moves the files from the source folder to the destination folder. If the folder already exists in the destination, only move the new files. Parameters ---------- - src : str + src : Path The source folder dest : Path, optional The destination folder used by Komga, by default Path(config.komga.media_path) """ # Get the files in the folder # +move the folders from src to disc, if folder already exists, only move new files - if "~" in src: - src = os.path.expanduser(src) + folders = os.listdir(src) for folder in folders: if not os.path.exists(f"{dest}/{folder}"): - print(f"Moving {folder} to {dest}") - shutil.move(f"{src}/{folder}", dest) + log.info(f"Moving {folder} to {dest}") + os.mkdir(f"{dest}/{folder}") + files = os.listdir(f"{src}/{folder}") + for file in files: + log.info(f"Moving {file} to {dest}/{folder}") + shutil.move(f"{src}/{folder}/{file}", f"{dest}/{folder}") + # shutil.move(f"{src}/{folder}", dest) else: files = os.listdir(f"{src}/{folder}") for file in files: if not os.path.exists(f"{dest}/{folder}/{file}"): - print(f"Moving {file} to {dest}/{folder}") + log.info(f"Moving {file} to {dest}/{folder}") shutil.move(f"{src}/{folder}/{file}", f"{dest}/{folder}") # Remove empty folders remove_empty_folders(src) @@ -122,13 +137,13 @@ def remove_empty_folders(src): if os.path.isfile(f"{src}/{folder}"): continue if not os.listdir(f"{src}/{folder}"): - print(f"Removing {folder}") + log.info(f"Removing {folder}") os.rmdir(f"{src}/{folder}") else: remove_empty_folders(f"{src}/{folder}") -def detect_chapters(src: Path = Path(config.komgrabber.download_location)) -> None: +def detect_chapters(src: Path = config.komgrabber.download_location) -> None: """ Detects and deletes any non-volume file in the source folder @@ -137,8 +152,7 @@ def detect_chapters(src: Path = Path(config.komgrabber.download_location)) -> No src : Path, optional The Path to be checked, by default Path(config.komgrabber.download_location) """ - if "~" in src: - src = os.path.expanduser(src) + for folder in os.listdir(src): if os.path.isdir(f"{src}/{folder}"): files = os.listdir(f"{src}/{folder}") @@ -146,7 +160,126 @@ def detect_chapters(src: Path = Path(config.komgrabber.download_location)) -> No # check for regex "v(d) #(d)" in the file name regex = re.compile(r"^.* v(\d+) #(\d+(?:-\d+)?)\.cbz$") if regex.search(file): - print(f"File {file} is a Volume") + log.debug(f"File {file} is a Volume") else: - print(f"Deleting chapter {file}") - os.remove(f"{src}/{folder}/{file}") + log.debug(f"Deleting chapter {file}") + if os.path.isdir(f"{src}/{folder}/{file}"): + shutil.rmtree(f"{src}/{folder}/{file}") + else: + os.remove(f"{src}/{folder}/{file}") + + +def folder_similarity(folder1, folder2) -> float: + """ + Calculate the similarity between two folder names using Jaro-Winkler distance. + + Args: + folder1 (str): First folder name + folder2 (str): Second folder name + + Returns: + float: Similarity score between 0 and 1 + """ + similarity = jaro.jaro_winkler_metric(folder1, folder2) + return similarity + + +def rename_folder(src=config.komgrabber.download_location, series=None) -> bool: + renamer_regex = r"(\s*\([^)]*\))+$" + for folder in os.listdir(src): + if os.path.isdir(f"{src}/{folder}"): + # remove the series name from the folder + new_folder = re.sub(renamer_regex, "", folder) + os.rename(f"{src}/{folder}", f"{src}/{new_folder}") + # rename the files in the folder + # rename(f"{src}/{new_folder}") + else: + new_folder = folder.split(".")[0] + new_folder = re.sub(renamer_regex, "", new_folder) + os.mkdir(f"{src}/{new_folder}") + os.rename(f"{src}/{folder}", f"{src}/{new_folder}/{folder}") + + # get similarity + similarity = jaro.jaro_winkler_metric(series.name, new_folder) + log.info(f"Similarity between {series.name} and {new_folder} is {similarity}") + if similarity > 0.85: + log.info(f"Renaming {new_folder} to {series.name}") + os.rename(f"{src}/{new_folder}", f"{src}/{series.name}") + for file in os.listdir(f"{src}/{series.name}"): + os.rename( + f"{src}/{series.name}/{file}", + f"{src}/{series.name}/{file.replace(new_folder, series.name)}", + ) + + return True + else: + return False + + +def time_checker(timestamp1: float, timestamp2: float) -> int: + """Calculate the difference in days between two timestamps. + + Args: + timestamp1 (float): First timestamp in seconds since epoch + timestamp2 (float): Second timestamp in seconds since epoch + + Returns: + int: Absolute difference in days between the timestamps + """ + difference = abs(timestamp1 - timestamp2) + days = int(difference / (24 * 60 * 60)) # Convert seconds to days + return days + + +def calculate_new_volumes( + present_volumes: list[int], new_volumes: list[int] +) -> list[int]: + if len(new_volumes) == 1: + if max(new_volumes) > max(present_volumes): + return new_volumes + else: + return [] + else: + new_volumes = sorted(new_volumes) + new_volumes = [i for i in new_volumes if i > max(present_volumes)] + if len(new_volumes) == 0: + return [] + else: + return new_volumes + + +def safe_remove_directory( + path: Path, max_retries: int = 3, retry_delay: float = 0.5 +) -> bool: + """Safely remove a directory and all its contents. + + Args: + path (str): Path to the directory to remove + max_retries (int, optional): Number of deletion attempts. Defaults to 3. + retry_delay (float, optional): Delay between retries in seconds. Defaults to 0.5. + + Returns: + bool: True if directory was removed successfully, False otherwise + """ + for attempt in range(max_retries): + try: + if os.path.exists(path): + shutil.rmtree(path, ignore_errors=False) + return True + except OSError as e: + if attempt == max_retries - 1: # Last attempt failed + log.error( + f"Failed to remove directory {path} after {max_retries} attempts: {e}" + ) + return False + time.sleep(retry_delay) + return False + + +def get_series_update_date(series_name: str) -> str: + db = KomCache() + update_date = db.query( + query="SELECT last_update FROM komgrabber WHERE name = ? LIMIT 1", + args=(series_name,), + ) + print(update_date)