diff --git a/src/tagger.py b/src/tagger.py index cd5bba8..56a96c8 100644 --- a/src/tagger.py +++ b/src/tagger.py @@ -1,36 +1,59 @@ +import os from komgapi import komgapi as komga from komconfig import KomConfig from .api.anilistapi import anilistAPI -from .logger import log -import os -from logic.metadataHandlers import handleAnilist +from src.logic.metadataHandlers import handleAnilist from .similarity import similarity from .utils import write - +from komcache import KomCache +import pprint +from alive_progress import alive_it SKIPS = [] +import loguru +import sys + +log = loguru.logger +log.remove() +log.add("logs/tagger.log", rotation="1 week", retention="1 month") class Tagger: def __init__(self): self.config = KomConfig() self.komga = komga( - url=self.config.general.komga.url, - username=self.config.general.komga.user, - password=self.config.general.komga.password, + url=self.config.komga.url, + username=self.config.komga.user, + password=self.config.komga.password, timeout=60, ) self.mode = None + self.verbose = False + self.cache = None + self.populate_cache = False + if self.verbose: + log.add(sys.stdout) + if self.config.komtagger.check_cache: + log.info("Using cache for fallback - currently only in failed tagging") + self.cache = KomCache() + self.populate_cache = True + + if self.populate_cache: + log.info("Populating cache with series names") + self.populate_cache_data() + + # if verbose remove log level filter self.api: anilistAPI = anilistAPI() # create failed and success files if not os.path.exists("failed.txt"): - with open("failed.txt", "w") as f: + with open("failed_series.txt", "w") as f: f.write("") if not os.path.exists("success.txt"): - with open("success.txt", "w") as f: + with open("success_series.txt", "w") as f: f.write("") + self.cache = KomCache() log.info("Tagger initialized") @@ -44,9 +67,27 @@ class Tagger: self.mode = "merge" log.info("Mode set to merge") - def get_and_update_manga(self, komga_id, mangadex_id): + def populate_cache_data(self): + log.info("Populating cache with series names") + series = self.komga.series_controller.getAllSeries() + bar = alive_it(series, title="Populating cache with series names") + for serie in bar: + self.cache.insert( + "INSERT IGNORE INTO komtagger (series_id, title, status) VALUES (:series_id, :title, :status)", + { + "series_id": serie.id, + "title": serie.name, + "status": serie.metadata.status if serie.metadata else None, + }, + ) + log.info("Cache populated with series data") + + def get_and_update_manga(self, komga_id: str, mangadex_id: int): api_metadata = self.api.get_metadata(mangadex_id) komga_metadata = self.komga.series_controller.getSeries(komga_id) + if api_metadata is None: + log.error(f"No metadata found for {komga_metadata.name}") + return converted_api_metadata = handleAnilist(api_metadata) newMetadata = { @@ -80,11 +121,26 @@ class Tagger: "totalBookCount": converted_api_metadata.totalBookCount, "alternateTitles": converted_api_metadata.alternateTitles, } + # replace True with "true", False with "false" and None with "null". Also set the dict to use " instead of ' for the keys + # and values + # for key, value in newMetadata.items(): + # if isinstance(value, bool): + # newMetadata[key] = str(value).lower() + # elif value is None: + # newMetadata[key] = "null" + + log.info(f"Updating {komga_metadata.name}") + + # newMetadata = json.dumps(newMetadata) + log.debug(f"new metadata: {newMetadata}") if self.mode == "overwrite": log.debug(f"Overwriting metadata for {komga_metadata.name}") # log.debug(newMetadata) - self.komga.series_controller.patchMetadata(komga_id, newMetadata) + + data = self.komga.series_controller.patchMetadata(komga_id, newMetadata) + if data is not None: + log.error(data) elif self.mode == "merge": komga_tags = komga_metadata.metadata.tags newMetadata["tags"] = list(set(komga_tags + newMetadata["tags"])) @@ -104,6 +160,7 @@ class Tagger: def tag(self): komgaseries = self.komga.series_controller.getAllSeries() + log.info(f"Found {len(komgaseries)} series to tag") if not os.path.exists("success.txt"): with open("success.txt", "w") as f: f.write("") @@ -115,8 +172,14 @@ class Tagger: done.extend(failed) - for series in komgaseries: + def finalize_bar(bar): + bar.title("Completed tagging") + + bar = alive_it(komgaseries, title="Tagging series", finalize=finalize_bar) + + for series in bar: seriesTitle = series.name.strip() + if "Omnibus" in seriesTitle: seriesTitle = seriesTitle.replace("Omnibus", "").strip() @@ -148,6 +211,9 @@ class Tagger: if result == 1: log.success(f"Tagged {seriesTitle} successfully") write(seriesTitle, "success") + break + else: + write(seriesTitle, "failed") # for alt in alt_titles: # # alt is a dict, lang: title # title = alt.get(list(alt.keys())[0]) @@ -162,7 +228,7 @@ class Tagger: # else: # break - def get_metadata(self, mtitle: str, id=None): + def get_metadata(self, mtitle: str, id: int = 0): mangadex_titles = self.api.getSeries(mtitle) if mangadex_titles is None or len(mangadex_titles) == 0: log.error(f"No api titles found for {mtitle}") @@ -202,33 +268,163 @@ class Tagger: else: return 1 - # def tag_series(self, series_id): - # series = self.komga.series_controller.getSeries(series_id) - # log.info(f"Tagging {series.name}") - # title = translate(series.name) - # mangadex_titles = self.mangadex.get_series(title) - # if len(mangadex_titles) == 0: - # log.warning(f"No mangadex titles found for {series.name}") - # print(len(mangadex_titles)) - # for result in mangadex_titles: - # if similarity(series.name, result.name) > 0.8: - # title = result.name - # alt_titles = result.alternate_names - # if series.name.strip() == title.strip(): - # self.get_and_update_manga(series.id, result.series_id) - # log.info(f"Tagged {series.name} successfully") - # else: - # log.info( - # f"No direct match, searching in alternate Titles {series.name}" - # ) - # found = False - # for alt in alt_titles: - # # alt is a dict, lang: title - # title = alt.get(list(alt.keys())[0]) - # if series.name.strip() == title.strip(): - # self.get_and_update_manga(series.id, result.series_id) - # log.info(f"Tagged {series.name} successfully") - # found = True - # break - # if found: - # break + def tag_series(self, series_ids: list[str] = [""], series_names: list[str] = [""]): + if series_names != [""]: + series_ids = [] + for serie_name in series_names: + series_data = self.komga.series_controller.getAllSeries( + body={ + "condition": { + "anyOf": [ + { + "title": { + "operator": "contains", + "value": f"{serie_name}", + } + }, + ] + } + } + ) + series_ids.append(series_data[0].id) + + for series_id in series_ids: + series = self.komga.series_controller.getSeries(series_id) + log.info(f"Tagging {series.name}") + # title = translate(series.name) + mangadex_titles = self.api.search_manga(series.name) + if len(mangadex_titles) == 0: + log.warning(f"No mangadex titles found for {series.name}") + for result in mangadex_titles: + if similarity(series.name.lower(), result.title.english.lower()) > 0.8: + title = result.title + alt_titles = result.synonyms + if series.name.strip() == title.english.strip(): + self.get_and_update_manga(series.id, result.id) + log.info(f"Tagged {series.name} successfully") + break + else: + log.info( + f"No direct match, searching in alternate Titles {series.name}" + ) + found = False + for alt in alt_titles: + # alt is a dict, lang: title + title = alt # .get(list(alt.keys())[0]) + if series.name.strip() == title.strip(): + self.get_and_update_manga(series.id, result.id) + log.info(f"Tagged {series.name} successfully") + found = True + break + if found: + break + else: + log.info( + f"No match found for {series.name} and {result.title.english}" + ) + + def tag_manual(self, series_name: str, anilist_id: int): + self.mode = "overwrite" + series = self.komga.series_controller.getAllSeries( + body={ + "condition": { + "anyOf": [ + { + "title": { + "operator": "contains", + "value": f"{series_name}", + } + }, + ] + } + } + ) + if not series: + log.error(f"No series found with name {series_name}") + cached_id = self.cache.query( + "SELECT series_id FROM komtagger WHERE title = :title", + {"title": series_name}, + ) + if cached_id: + series_id = cached_id[0][0] + log.info(f"Using cached series ID {series_id} for {series_name}") + series = self.komga.series_controller.getSeries(series_id) + else: + log.error(f"No cached series ID found for {series_name}") + return + + series = series[0] + log.info(f"Tagging {series.name}, {series.id} with Anilist ID {anilist_id}") + + metadata = self.api.get_metadata(anilist_id) + log.debug(f"Metadata for Anilist ID {anilist_id}: {metadata}") + if metadata is None: + log.error(f"No metadata found for Anilist ID {anilist_id}") + return + converted_metadata = handleAnilist(metadata) + print("-" * 20) + pprint.pprint(converted_metadata.print) + print("-" * 20) + matching = int( + input( + f"Does the converted metadata match the series {series.name} (1 for yes, 0 for no)? " + ).strip() + or "1" + ) + if matching == 1: + self.get_and_update_manga(series.id, anilist_id) + write(series.name, "success") + else: + log.warning(f"Metadata does not match for {series.name}. Exiting.") + + def show_failed(self, any): # type: ignore + with open("failed.txt", "r") as f: + failed = f.readlines() + failed = list(set(failed)) + with open("failed.txt", "w") as f: + f.write("\n".join(failed)) + + if not failed: + log.info("No failed items found") + return + log.info(f"Found {len(failed)} failed items") + print(f"Found {len(failed)} failed items:") + anilist_id = None + for item in failed: + item = item.strip() + print(f"- {item}") + anilist_results = self.api.search_manga(item) + if not anilist_results: + anilist_id = input( + f"Enter Anilist ID for {item} (or press enter to skip): " + ) + if anilist_id.strip() == "": + log.info(f"Skipping {item.strip()}") + continue + try: + anilist_id = int(anilist_id.strip()) + except ValueError: + log.error(f"Invalid Anilist ID {anilist_id}, skipping") + continue + else: + data = {} + for index, content in enumerate(anilist_results): + data[index] = f"{content.id} - {content.title}" + print("Found the following Anilist results:") + for index, content in data.items(): + print(f"{index}: {content}") + print( + "Enter the index of the result you want to use (or press enter to skip): " + ) + index = input().strip() + if index.strip() == "": + log.info(f"Skipping {item.strip()}") + continue + try: + index = int(index.strip()) + anilist_id = anilist_results[index].id + except (ValueError, IndexError): + log.error(f"Invalid index {index}, skipping") + continue + log.info(f"Using Anilist ID {anilist_id} for {item.strip()}") + self.tag_manual(item, anilist_id)