add logging, typechecking

add various functions
This commit is contained in:
2025-05-06 20:51:36 +02:00
parent a3a96b054e
commit 21a653d9f0
2 changed files with 194 additions and 47 deletions

View File

@@ -4,11 +4,24 @@ from komconfig import KomConfig
from pathlib import Path
import shutil
import subprocess
import jaro
import loguru
import sys
import time
from komgapi import komgapi
from komcache import KomCache
cfg = KomConfig()
log = loguru.logger
log.remove()
log.add("logs/utils.log", rotation="1 week", retention="1 month")
log.add(sys.stdout, level="INFO")
config = KomConfig()
komga = komgapi(cfg.komga.user, cfg.komga.password, cfg.komga.url)
def rename(folder: str = "/home/alexander/Downloads/torrents/manga/") -> None:
def rename(folder: Path = config.komgrabber.download_location) -> None:
"""Rename the files in a folder according to the template.
Template: [Name] v[nr] #[nr].ext (e.g. "The Flash v1 #1.cbz").
@@ -17,15 +30,12 @@ def rename(folder: str = "/home/alexander/Downloads/torrents/manga/") -> None:
- folder (str): the string to the folder
"""
# Get the files in the folder
if "~" in folder:
folder = os.path.expanduser(folder)
files = os.listdir(folder)
print(files)
for file in files:
if os.path.isdir(f"{folder}/{file}"):
rename(f"{folder}/{file}")
rename(Path(f"{folder}/{file}"))
if not file.endswith(".cbz"):
print(f"Skipping {file}, not a cbz file")
log.debug(f"Skipping {file}, not a cbz file")
continue
ext = file.split(".")[-1]
@@ -39,7 +49,6 @@ def rename(folder: str = "/home/alexander/Downloads/torrents/manga/") -> None:
title = file[:split_start].strip()
# add the volume number to the title as a suffix #nr
title = f"{title} {volume} #{volume.replace('v', '')}"
print(title)
# rename the file
os.rename(f"{folder}/{file}", f"{folder}/{title}.{ext}")
@@ -49,10 +58,10 @@ def rename_recursive(folder: str) -> None:
# get all directories in the folder and apply the rename function to them
for root, dirs, _files in os.walk(folder):
for dir in dirs:
rename(f"{root}/{dir}")
rename(Path(f"{root}/{dir}"))
def tag_folder(folder: Path = Path(config.komgrabber.download_location)) -> None:
def tag_folder(folder: Path = config.komgrabber.download_location) -> None:
"""
Recursively tags all the .cbz files in the folder using ComicTagger
@@ -70,39 +79,45 @@ def tag_folder(folder: Path = Path(config.komgrabber.download_location)) -> None
tag_folder(f"{folder}/{file}")
if not file.endswith(".cbz"):
continue
print("Trying to tag file", file)
log.info(f"Trying to tag file {file}")
subprocess.call(
f'comictagger -s -t cr -f -o "{folder}/{file}" --nosummary --overwrite {"-i" if config.komgrabber.tag_interactive else ""}',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
def move(src, dest: Path = Path(config.komga.media_path)) -> None:
def move(src: Path, dest: Path = Path(config.komga.media_path)) -> None:
"""
Moves the files from the source folder to the destination folder.
If the folder already exists in the destination, only move the new files.
Parameters
----------
src : str
src : Path
The source folder
dest : Path, optional
The destination folder used by Komga, by default Path(config.komga.media_path)
"""
# Get the files in the folder
# +move the folders from src to disc, if folder already exists, only move new files
if "~" in src:
src = os.path.expanduser(src)
folders = os.listdir(src)
for folder in folders:
if not os.path.exists(f"{dest}/{folder}"):
print(f"Moving {folder} to {dest}")
shutil.move(f"{src}/{folder}", dest)
log.info(f"Moving {folder} to {dest}")
os.mkdir(f"{dest}/{folder}")
files = os.listdir(f"{src}/{folder}")
for file in files:
log.info(f"Moving {file} to {dest}/{folder}")
shutil.move(f"{src}/{folder}/{file}", f"{dest}/{folder}")
# shutil.move(f"{src}/{folder}", dest)
else:
files = os.listdir(f"{src}/{folder}")
for file in files:
if not os.path.exists(f"{dest}/{folder}/{file}"):
print(f"Moving {file} to {dest}/{folder}")
log.info(f"Moving {file} to {dest}/{folder}")
shutil.move(f"{src}/{folder}/{file}", f"{dest}/{folder}")
# Remove empty folders
remove_empty_folders(src)
@@ -122,13 +137,13 @@ def remove_empty_folders(src):
if os.path.isfile(f"{src}/{folder}"):
continue
if not os.listdir(f"{src}/{folder}"):
print(f"Removing {folder}")
log.info(f"Removing {folder}")
os.rmdir(f"{src}/{folder}")
else:
remove_empty_folders(f"{src}/{folder}")
def detect_chapters(src: Path = Path(config.komgrabber.download_location)) -> None:
def detect_chapters(src: Path = config.komgrabber.download_location) -> None:
"""
Detects and deletes any non-volume file in the source folder
@@ -137,8 +152,7 @@ def detect_chapters(src: Path = Path(config.komgrabber.download_location)) -> No
src : Path, optional
The Path to be checked, by default Path(config.komgrabber.download_location)
"""
if "~" in src:
src = os.path.expanduser(src)
for folder in os.listdir(src):
if os.path.isdir(f"{src}/{folder}"):
files = os.listdir(f"{src}/{folder}")
@@ -146,7 +160,126 @@ def detect_chapters(src: Path = Path(config.komgrabber.download_location)) -> No
# check for regex "v(d) #(d)" in the file name
regex = re.compile(r"^.* v(\d+) #(\d+(?:-\d+)?)\.cbz$")
if regex.search(file):
print(f"File {file} is a Volume")
log.debug(f"File {file} is a Volume")
else:
print(f"Deleting chapter {file}")
os.remove(f"{src}/{folder}/{file}")
log.debug(f"Deleting chapter {file}")
if os.path.isdir(f"{src}/{folder}/{file}"):
shutil.rmtree(f"{src}/{folder}/{file}")
else:
os.remove(f"{src}/{folder}/{file}")
def folder_similarity(folder1, folder2) -> float:
"""
Calculate the similarity between two folder names using Jaro-Winkler distance.
Args:
folder1 (str): First folder name
folder2 (str): Second folder name
Returns:
float: Similarity score between 0 and 1
"""
similarity = jaro.jaro_winkler_metric(folder1, folder2)
return similarity
def rename_folder(src=config.komgrabber.download_location, series=None) -> bool:
renamer_regex = r"(\s*\([^)]*\))+$"
for folder in os.listdir(src):
if os.path.isdir(f"{src}/{folder}"):
# remove the series name from the folder
new_folder = re.sub(renamer_regex, "", folder)
os.rename(f"{src}/{folder}", f"{src}/{new_folder}")
# rename the files in the folder
# rename(f"{src}/{new_folder}")
else:
new_folder = folder.split(".")[0]
new_folder = re.sub(renamer_regex, "", new_folder)
os.mkdir(f"{src}/{new_folder}")
os.rename(f"{src}/{folder}", f"{src}/{new_folder}/{folder}")
# get similarity
similarity = jaro.jaro_winkler_metric(series.name, new_folder)
log.info(f"Similarity between {series.name} and {new_folder} is {similarity}")
if similarity > 0.85:
log.info(f"Renaming {new_folder} to {series.name}")
os.rename(f"{src}/{new_folder}", f"{src}/{series.name}")
for file in os.listdir(f"{src}/{series.name}"):
os.rename(
f"{src}/{series.name}/{file}",
f"{src}/{series.name}/{file.replace(new_folder, series.name)}",
)
return True
else:
return False
def time_checker(timestamp1: float, timestamp2: float) -> int:
"""Calculate the difference in days between two timestamps.
Args:
timestamp1 (float): First timestamp in seconds since epoch
timestamp2 (float): Second timestamp in seconds since epoch
Returns:
int: Absolute difference in days between the timestamps
"""
difference = abs(timestamp1 - timestamp2)
days = int(difference / (24 * 60 * 60)) # Convert seconds to days
return days
def calculate_new_volumes(
present_volumes: list[int], new_volumes: list[int]
) -> list[int]:
if len(new_volumes) == 1:
if max(new_volumes) > max(present_volumes):
return new_volumes
else:
return []
else:
new_volumes = sorted(new_volumes)
new_volumes = [i for i in new_volumes if i > max(present_volumes)]
if len(new_volumes) == 0:
return []
else:
return new_volumes
def safe_remove_directory(
path: Path, max_retries: int = 3, retry_delay: float = 0.5
) -> bool:
"""Safely remove a directory and all its contents.
Args:
path (str): Path to the directory to remove
max_retries (int, optional): Number of deletion attempts. Defaults to 3.
retry_delay (float, optional): Delay between retries in seconds. Defaults to 0.5.
Returns:
bool: True if directory was removed successfully, False otherwise
"""
for attempt in range(max_retries):
try:
if os.path.exists(path):
shutil.rmtree(path, ignore_errors=False)
return True
except OSError as e:
if attempt == max_retries - 1: # Last attempt failed
log.error(
f"Failed to remove directory {path} after {max_retries} attempts: {e}"
)
return False
time.sleep(retry_delay)
return False
def get_series_update_date(series_name: str) -> str:
db = KomCache()
update_date = db.query(
query="SELECT last_update FROM komgrabber WHERE name = ? LIMIT 1",
args=(series_name,),
)
print(update_date)