Files
KomGrabber/src/logic/utils.py
2025-09-28 20:03:18 +02:00

466 lines
18 KiB
Python

import os
import re
import shutil
import subprocess
import sys
import time
from pathlib import Path
import jaro
import loguru
from komcache import KomCache
from komconfig import KomConfig
from komconfig.config import Library
from komgapi import komgapi
from komgapi.schemas import Series
cfg = KomConfig()
log = loguru.logger
log.remove()
log.add("logs/utils.log", rotation="1 week", retention="1 month")
log.add(sys.stdout, level="INFO")
config = KomConfig()
komga = komgapi(cfg.komga.user, cfg.komga.password, cfg.komga.url)
def rename(folder: Path = config.komgrabber.tag_location) -> None:
"""Rename the files in a folder according to the template.
Template: [Name] v[nr] #[nr].ext (e.g. "The Flash v1 #1.cbz").
Args:
----
- folder (str): the string to the folder
"""
# Get the files in the folder
files = os.listdir(folder)
for file in files:
if os.path.isdir(f"{folder}/{file}"):
rename(Path(f"{folder}/{file}"))
if not file.endswith(".cbz"):
log.debug(f"Skipping {file}, not a cbz file")
continue
ext = file.split(".")[-1]
match = re.search(r"v\d{2,4}", file)
if match:
split_start = match.start()
split_end = match.end()
# Split the filename between split_start and split_end
volume = file[split_start:split_end]
# Split the filename at the split index, but keep the "v" and digits in the title
title = file[:split_start].strip()
# add the volume number to the title as a suffix #nr
title = f"{title} {volume} #{volume.replace('v', '')}"
# rename the file
os.rename(f"{folder}/{file}", f"{folder}/{title}.{ext}")
# rename the folder
def rename_recursive(folder: str) -> None:
# get all directories in the folder and apply the rename function to them
for root, dirs, _files in os.walk(folder):
for dir in dirs:
rename(Path(f"{root}/{dir}"))
def tag_folder(folder: Path = config.komgrabber.tag_location) -> None:
"""
Recursively tags all the .cbz files in the folder using ComicTagger
Parameters
----------
folder : Path, optional
The path that will be used to tag, by default Path(config.komgrabber.tag_location)
"""
# Get the files in the folder
if "~" in str(folder):
folder = os.path.expanduser(folder)
files = os.listdir(folder)
for file in files:
if os.path.isdir(f"{folder}/{file}"):
tag_folder(f"{folder}/{file}")
if not file.endswith(".cbz"):
continue
log.info(f"Trying to tag file {file}")
subprocess.call(
f'comictagger -s -t cr -f -o "{folder}/{file}" --nosummary --overwrite {"-i" if config.komgrabber.tag_interactive else ""}',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
def move(src: Path, library_path: str) -> None:
"""
Moves the files from the source folder to the destination folder.
If the folder already exists in the destination, only move the new files.
Parameters
----------
src : Path
The source folder
dest : str
The destination folder used by Komga for the library, set in config file, defaults to "Manga"
"""
# Get the files in the folder
# +move the folders from src to disc, if folder already exists, only move new files
dest = Path(config.komga.media_path, library_path)
folders = os.listdir(src)
for folder in folders:
if not os.path.exists(f"{dest}/{folder}"):
log.info(f"Moving {folder} to {dest}")
os.mkdir(f"{dest}/{folder}")
files = os.listdir(f"{src}/{folder}")
for file in files:
log.info(f"Moving {file} to {dest}/{folder}")
shutil.move(f"{src}/{folder}/{file}", f"{dest}/{folder}")
# shutil.move(f"{src}/{folder}", dest)
else:
files = os.listdir(f"{src}/{folder}")
for file in files:
if file.startswith("."):
log.debug(f"Skipping hidden file {file}")
continue
if not os.path.exists(f"{dest}/{folder}/{file}"):
log.info(f"Moving {file} to {dest}/{folder}")
shutil.move(f"{src}/{folder}/{file}", f"{dest}/{folder}")
# Remove empty folders
remove_empty_folders(src)
def remove_empty_folders(src: Path):
"""
Recursively removes empty folders in the source folder
Parameters
----------
src : Path, str
The source folder
"""
folders = os.listdir(src)
for folder in folders:
if os.path.isfile(f"{src}/{folder}"):
continue
if not os.listdir(f"{src}/{folder}"):
log.info(f"Removing {folder}")
os.rmdir(f"{src}/{folder}")
else:
newPath = Path(f"{src}/{folder}")
remove_empty_folders(newPath)
def detect_chapters(
src: Path = config.komgrabber.tag_location, valid_extension: str = "cbz|epub"
) -> None:
"""
Detects and deletes any non-volume file in the source folder
Parameters
----------
src : Path, optional
The Path to be checked, by default Path(config.komgrabber.tag_location)
"""
log.info(f"Checking {src} for chapters")
regex = re.compile(rf"^.* v(\d+) #(\d+(?:-\d+)?)\.({valid_extension})$")
for folder in os.listdir(src):
if os.path.isdir(f"{src}/{folder}"):
files = os.listdir(f"{src}/{folder}")
for file in files:
if os.path.isdir(f"{src}/{folder}/{file}"):
folder_files = os.listdir(f"{src}/{folder}/{file}")
for folder_file in folder_files:
# check for regex "v(d) #(d)" in the file name
if regex.search(folder_file):
log.debug(f"File {folder_file} is a Volume")
else:
log.info(f"Deleting chapter {folder_file}")
if os.path.isfile(f"{src}/{folder}/{file}/{folder_file}"):
os.remove(f"{src}/{folder}/{file}/{folder_file}")
else:
shutil.rmtree(f"{src}/{folder}/{file}/{folder_file}")
# check for regex "v(d) #(d)" in the file name
if regex.search(file):
log.debug(f"File {file} is a Volume")
else:
log.info(f"Deleting chapter {file}")
if os.path.isfile(f"{src}/{folder}/{file}"):
os.remove(f"{src}/{folder}/{file}")
else:
if os.path.isdir(f"{src}/{folder}/{file}"):
for subfile in os.listdir(f"{src}/{folder}/{file}"):
if regex.search(subfile):
log.debug(f"File {subfile} is a Volume")
else:
log.info(f"Deleting chapter {subfile}")
if os.path.isfile(
f"{src}/{folder}/{file}/{subfile}"
):
os.remove(f"{src}/{folder}/{file}/{subfile}")
else:
shutil.rmtree(
f"{src}/{folder}/{file}/{subfile}"
)
else:
shutil.rmtree(f"{src}/{folder}/{file}")
def folder_similarity(folder1: str, folder2: str) -> float:
"""
Calculate the similarity between two folder names using Jaro-Winkler distance.
Args:
folder1 (str): First folder name
folder2 (str): Second folder name
Returns:
float: Similarity score between 0 and 1
"""
similarity = jaro.jaro_winkler_metric(folder1, folder2)
return similarity
def rename_folder(
src: Path = config.komgrabber.tag_location, series: Series = None
) -> bool:
renamer_regex = r"(\s*\([^)]*\))+$"
for folder in os.listdir(src):
if os.path.isdir(f"{src}/{folder}"):
# remove the series name from the folder
new_folder = re.sub(renamer_regex, "", folder)
os.rename(f"{src}/{folder}", f"{src}/{new_folder}")
# rename the files in the folder
# rename(f"{src}/{new_folder}")
else:
new_folder = folder.split(".")[0]
new_folder = re.sub(renamer_regex, "", new_folder)
os.mkdir(f"{src}/{new_folder}")
os.rename(f"{src}/{folder}", f"{src}/{new_folder}/{folder}")
# get similarity
similarity = jaro.jaro_winkler_metric(series.name, new_folder)
log.info(f"Similarity between {series.name} and {new_folder} is {similarity}")
if similarity > 0.85:
log.info(f"Renaming {new_folder} to {series.name}")
os.rename(f"{src}/{new_folder}", f"{src}/{series.name}")
for file in os.listdir(f"{src}/{series.name}"):
os.rename(
f"{src}/{series.name}/{file}",
f"{src}/{series.name}/{file.replace(new_folder, series.name)}",
)
return True
else:
return False
def time_checker(timestamp1: float, timestamp2: float) -> int:
"""Calculate the difference in days between two timestamps.
Args:
timestamp1 (float): First timestamp in seconds since epoch
timestamp2 (float): Second timestamp in seconds since epoch
Returns:
int: Absolute difference in days between the timestamps
"""
difference = abs(timestamp1 - timestamp2)
days = int(difference / (24 * 60 * 60)) # Convert seconds to days
return days
def calculate_new_volumes(
present_volumes: list[int], new_volumes: list[int]
) -> list[int]:
if len(new_volumes) == 1:
if len(present_volumes) == 0:
return new_volumes
if max(new_volumes) > max(present_volumes):
# return any new volume that is not in present volumes
return [v for v in new_volumes if v not in present_volumes]
else:
return []
else:
if len(present_volumes) == 0:
return new_volumes
new_volumes = sorted(new_volumes)
new_volumes = [i for i in new_volumes if i > max(present_volumes)]
if len(new_volumes) == 0:
return []
else:
return new_volumes
def safe_remove_directory(
path: Path, max_retries: int = 3, retry_delay: float = 0.5
) -> bool:
"""Safely remove a directory and all its contents.
Args:
path (str): Path to the directory to remove
max_retries (int, optional): Number of deletion attempts. Defaults to 3.
retry_delay (float, optional): Delay between retries in seconds. Defaults to 0.5.
Returns:
bool: True if directory was removed successfully, False otherwise
"""
for attempt in range(max_retries):
try:
if os.path.exists(path):
shutil.rmtree(path, ignore_errors=False)
return True
except OSError as e:
if attempt == max_retries - 1: # Last attempt failed
log.error(
f"Failed to remove directory {path} after {max_retries} attempts: {e}"
)
return False
time.sleep(retry_delay)
return False
def get_series_update_date(series_name: str) -> str:
db = KomCache()
update_date = db.query(
query="SELECT last_update FROM komgrabber WHERE name = ? LIMIT 1",
args=(series_name,),
)
print(update_date)
def process_manga(download_path: Path, library: Library, serie: Series) -> None:
"""Process the downloaded manga: rename files, detect chapters, tag, rename folder, and move to library."""
rename(download_path)
if not config.komgrabber.get_chapters:
detect_chapters(download_path, "|".join(library.valid_extensions))
tag_folder(download_path)
if rename_folder(series=serie, src=download_path):
move(
download_path,
library.media_path,
)
def process_novel(
download_path: Path, library: Library, serie: Series, copy: bool = False
) -> None:
"""Process the downloaded novel: rename files, tag, rename folder, and move to library."""
# rename the folder to the series name
folder = os.listdir(download_path)[0]
series_name = serie.name
# remove all files that are not valid extensions
valid_extensions = library.valid_extensions
# flatten subfolders and subsubfolders
for root, dirs, files in os.walk(f"{download_path}/{folder}"):
for dir in dirs:
for file in os.listdir(f"{root}/{dir}"):
if file.startswith("."):
log.debug(f"Skipping hidden file {file}")
continue
log.info(f"Moving {file} to {download_path}/{folder}")
shutil.move(f"{root}/{dir}/{file}", f"{download_path}/{folder}")
os.rmdir(f"{root}/{dir}")
# removing invalid extensions
for file in os.listdir(f"{download_path}/{folder}"):
if not any(file.endswith(ext) for ext in valid_extensions):
log.info(f"Removing {file} as it is not a valid extension")
if os.path.isfile(f"{download_path}/{folder}/{file}"):
os.remove(f"{download_path}/{folder}/{file}")
else:
shutil.rmtree(f"{download_path}/{folder}/{file}")
# rename files to remove all [] and text within
for file in os.listdir(f"{download_path}/{folder}"):
filename = file.split(".")[0]
if f"{series_name} - Volume" in filename:
log.debug(f"Skipping {file}, already renamed")
continue
# extract the volume number, may be a float, either v1, v1.5, v01, v01.5, vol.1, vol.01, vol.1.5, vol.01.5, Vol.1, Vol.01, Vol.1.5, Vol.01.5, Volume 1, Volume 01, Volume 1.5, Volume 01.5
regex_volume_pattern = r"(v|vol\.|Vol\.|Volume\s)(\d+(\.\d+)?)"
match = re.search(regex_volume_pattern, file, re.IGNORECASE)
# from the match, get the volume number
volume = match.group(2) if match else None
# rename the file to series name v(volume).ext
ext = file.split(".")[-1]
# if volume is not null and less than 10, pad with a 0
if volume and float(volume) < 10:
volume = f"0{volume}"
if volume and "00" in volume:
volume = volume.replace("00", "0")
fixed = (
f"{series_name} - Volume {volume}.{ext}"
if volume
else f"{series_name}.{ext}"
)
log.debug(f"Renaming {file} to {fixed}")
os.rename(
f"{download_path}/{folder}/{file}", f"{download_path}/{folder}/{fixed}"
)
# flatten subfolders
os.rename(f"{download_path}/{folder}", f"{download_path}/{series_name}")
dest = Path(config.komga.media_path, library.media_path)
folders = os.listdir(download_path)
log.info(f"Moving {folders} to {dest}")
for folder in folders:
log.info(f"Processing folder {folder}")
time.sleep(1)
if not os.path.exists(f"{dest}/{folder}"):
log.info(f"Moving {folder} to {dest}")
os.mkdir(f"{dest}/{folder}")
files = os.listdir(f"{download_path}/{folder}")
for file in files:
time.sleep(1)
log.debug(f"Moving {file} to {dest}/{folder}")
if copy:
# copy file to komgrabber tag location
copy_location = config.komgrabber.copy_location
if not os.path.exists(f"{copy_location}"):
os.mkdir(f"{copy_location}")
shutil.copy(
f"{download_path}/{folder}/{file}",
f"{copy_location}/{file}",
)
log.debug(
f"Copied from {download_path}/{folder}/{file} to {copy_location}/{file}"
)
shutil.move(f"{download_path}/{folder}/{file}", f"{dest}/{folder}")
# shutil.move(f"{src}/{folder}", dest)
else:
files = os.listdir(f"{download_path}/{folder}")
for file in files:
time.sleep(1)
log.debug(f"Processing file {file}")
if file.startswith("."):
log.debug(f"Skipping hidden file {file}")
continue
if not os.path.exists(f"{dest}/{folder}/{file}"):
log.debug(f"Moving {file} to {dest}/{folder}")
if copy:
# copy file to komgrabber tag location
copy_location = config.komgrabber.copy_location
if not os.path.exists(f"{copy_location}/{folder}"):
os.mkdir(f"{copy_location}")
shutil.copy(
f"{download_path}/{folder}/{file}",
f"{copy_location}/{file}",
)
log.debug(
f"Copied from {download_path}/{folder}/{file} to {copy_location}/{file}"
)
shutil.move(f"{download_path}/{folder}/{file}", f"{dest}/{folder}")
log.info("Finished moving files, removing empty folders")
remove_empty_folders(download_path)
if __name__ == "__main__":
print(folder_similarity("Dr. STONE (2018-2023) (Digital) (1r0n)", "Dr. STONE"))