initial commit

This commit is contained in:
2025-02-27 22:41:38 +01:00
commit abb16938f8
59 changed files with 4169 additions and 0 deletions

234
.gitignore vendored Normal file
View File

@@ -0,0 +1,234 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# ---> Qt
# C++ objects and libs
*.slo
*.lo
*.o
*.a
*.la
*.lai
*.so
*.so.*
*.dll
*.dylib
# Qt-es
object_script.*.Release
object_script.*.Debug
*_plugin_import.cpp
/.qmake.cache
/.qmake.stash
*.pro.user
*.pro.user.*
*.qbs.user
*.qbs.user.*
*.moc
moc_*.cpp
moc_*.h
qrc_*.cpp
ui_*.h
*.qmlc
*.jsc
Makefile*
*build-*
*.qm
*.prl
# Qt unit tests
target_wrapper.*
# QtCreator
*.autosave
# QtCreator Qml
*.qmlproject.user
*.qmlproject.user.*
# QtCreator CMake
CMakeLists.txt.user*
# QtCreator 4.8< compilation database
compile_commands.json
# QtCreator local machine specific files for imported projects
*creator.user*
*_qmlcache.qrc
.history
depend
output/output/LOGtoJSON.exe
.pytest_cache
output
docs/
config.yaml
**/tempCodeRunnerFile.py
uv.lock
.history
.venv
venv
*.log

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 WorldTeacher
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

0
README.md Normal file
View File

60
args.py Normal file
View File

@@ -0,0 +1,60 @@
import argparse
# Default path from settings
DEFAULT_PATH = "/default/path"
# Define functions for each action
def action_one(path):
print(f"Performing Action One with path: {path}")
def action_two(path):
print(f"Performing Action Two with path: {path}")
def action_three(path):
print(f"Performing Action Three with path: {path}")
# Map actions to their corresponding functions
ACTIONS = {
"action_one": action_one,
"action_two": action_two,
"action_three": action_three,
}
def main():
# Set up argument parser
parser = argparse.ArgumentParser(
description="Perform actions in a specified order."
)
# Add a --path argument to overwrite the default path
parser.add_argument(
"--path",
type=str,
default=DEFAULT_PATH,
help="Path to use for actions (overwrites default path).",
)
# Add arguments for each action
parser.add_argument(
"actions",
nargs="+", # Allow one or more actions to be specified
choices=ACTIONS.keys(), # Restrict to valid actions
help="List of actions to perform in order. Choices: "
+ ", ".join(ACTIONS.keys()),
)
# Parse arguments
args = parser.parse_args()
# Execute actions in the specified order
for action in args.actions:
ACTIONS[action](args.path)
if __name__ == "__main__":
main()

110
cli.py Normal file
View File

@@ -0,0 +1,110 @@
from src.logic.cli import avail_check, main as cli_main
import os
import argparse
from src.logic.rename import rename
from src.logic.tag import tag_folder
from src.logic.move import move
from src.logic.detect_chapters import detect_chapters
from komconfig import KomConfig
cfg = KomConfig()
def grabber():
nyaa, komga = avail_check()
print(nyaa, komga)
if nyaa is True and komga is True:
cli_main()
# kill aria2c
os.system("killall aria2c")
else:
print("No connection established, quitting")
def main(run_args=None):
parser = argparse.ArgumentParser(
description="A script to call various functions related to Komga File Management."
)
help_texts = {
"search": "Starts a search for all ongoing series in Komga.",
"move": "Moves the downloaded files from the download path to the Komga library.",
"tag": "Tries to tag all files in the download dir using comictagger.",
"rename": "Renames the files based on the naming scheme [Title] v[number] #[number] to get best results with comictagger.",
"detect_chapters": "Detects the chapters in the downloaded folders and removes them.",
}
search_parser = parser.add_argument_group("Search")
search_parser.add_argument(
"-s",
"--search",
dest="actions",
action="append_const",
const="search",
help=help_texts["search"],
)
move_parser = parser.add_argument_group("Move")
move_parser.add_argument(
"-m",
"--move",
dest="actions",
action="append_const",
const="move",
help=help_texts["move"],
)
tag_parser = parser.add_argument_group("Tag")
tag_parser.add_argument(
"-t",
"--tag",
dest="actions",
action="append_const",
const="tag",
help=help_texts["tag"],
)
rename_parser = parser.add_argument_group("Rename")
rename_parser.add_argument(
"-r",
"--rename",
dest="actions",
action="append_const",
const="rename",
help=help_texts["rename"],
)
detect_chapters_parser = parser.add_argument_group("Detect Chapters")
detect_chapters_parser.add_argument(
"-d",
"--detect_chapters",
dest="actions",
action="append_const",
const="detect_chapters",
help=help_texts["detect_chapters"],
)
parser.add_argument(
"-p",
"--path",
type=str,
default=cfg.komgrabber.download_location,
help="Path to use for actions (overwrites default path).",
)
args = parser.parse_args()
# based on provided arguments, call the corresponding function
if args.actions is None:
parser.print_help()
return
for action in args.actions:
if action == "search":
grabber()
elif action == "move":
move(src=args.path, dest=cfg.komga.media_path)
elif action == "tag":
tag_folder(args.path)
elif action == "rename":
rename(args.path)
elif action == "detect_chapters":
detect_chapters(args.path)
if __name__ == "__main__":
main()

29
config.json Normal file
View File

@@ -0,0 +1,29 @@
{
"komga_server": "http://10.10.20.20:9001/",
"komga_auth": {
"username": "kirchneralexander020@gmail.com",
"password": "3gxjVNW2a@27#Ti"
},
"comics.org":{
"location":"src/databases/comics.db"
},
"comicvine":{
"api_key":"0d87c5060d8f5f8e5b7f153b367b8b7596be46f8",
"url":"https://comicvine.gamespot.com/api/"
},
"download_location": "/home/alexander/Downloads/torrents/Manga_test/",
"include_factors": ["digital", "Digital"],
"skip_factors": [
"(Digital-Compilation)",
"WN",
"Colored",
"EPUB",
"epub",
"Epub",
"PDF",
"pdf",
"Pdf",
"Crazyankan&Rippersanime"
],
"komga_path": "/mnt/Media/Manga/"
}

0
config/config.py Normal file
View File

24
pyproject.toml Normal file
View File

@@ -0,0 +1,24 @@
[project]
name = "komgrabber"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"aria2p>=0.12.1",
"bencodepy>=0.9.5",
"feedparser>=6.0.11",
"jaro-winkler>=2.0.3",
"komconfig",
"komgapi",
"komsuite-nyaapy",
"limit>=0.2.3",
"loguru>=0.7.3",
"natsort>=8.4.0",
"omegaconf>=2.3.0",
]
[tool.uv.sources]
komgapi = { workspace = true }
komsuite-nyaapy = { workspace = true }
komconfig = { workspace = true }

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
__version__ = "0.0.1"

11
src/api_tests.py Normal file
View File

@@ -0,0 +1,11 @@
from src.data.comicvine import ComicVineAPI
# comicdata = comic.get_series("The Walking Dead")
comic = ComicVineAPI()
if __name__ == "__main__":
mangadata = comic.get_series("A Centaur's Life")
print(mangadata)
# print(type(mangadex.get_cover("8972b661-13b4-49da-a32b-1450cf9ac31d","f2bfcf6d-6535-4ee2-bc94-6de678a27399")))

9
src/aria.py Normal file
View File

@@ -0,0 +1,9 @@
import subprocess
def launch_aria2c():
subprocess.Popen(["aria2c", "--enable-rpc", "--rpc-listen-all"])
def kill_aria2c():
subprocess.Popen(["killall", "aria2c"])

102
src/cache_populate.py Normal file
View File

@@ -0,0 +1,102 @@
import json
from time import sleep
from jellyfish import levenshtein_distance as ld
from APIs import KomgaAPI, MangadexAPI
from APIs.cache import ListCache
config = json.load(open("config.json"))
ka = KomgaAPI(
"http://192.168.178.20:9001", auth=(config["username"], config["password"])
)
# print(ka.get_all_series())
lc = ListCache("mangacache.db")
# lc.populate_database()
# mapi=MangadexAPI()
# def test():
# pass
# def other():
# string='One Punch Man'
# test_dict={'id': '77dbfa29-8ef0-446d-82cc-1b4de878dd90', 'title': 'One Punch Man (Webcomic/Fan Colored)', 'alternate_titles': ['One Punch Man', 'Instant Death Cheat', "The Other World Doesn't Stand a Chance Against the Power of Instant Death"]}
# if string==test_dict["title"]:
# print("true")
# elif any(test_dict["alternate_titles"])==string:
# print("true")
# for title in test_dict["alternate_titles"]:
# print(jws(string,title),title)
# print("Levenshtein")
# for title in test_dict["alternate_titles"]:
# print(title)
# print(Levenshtein.distance(title.replace(" ",""), string.replace(" ","")))
def determine_mangadex_id(mangadex_result, series_title: str):
if isinstance(mangadex_result) == list:
for result in mangadex_result:
if result["title"] is None:
continue
if result["title"].lower() == series_title.lower():
return result["id"]
elif ld(result["title"].lower(), series_title.lower()) < 10:
return result["id"]
else:
for title in result["alternate_titles"]:
if ld(title.lower(), series_title.lower()) < 10:
return result["id"]
else:
if mangadex_result["title"].lower() == series_title.lower():
return mangadex_result["id"]
elif ld(mangadex_result["title"].lower(), series_title.lower()) < 10:
return mangadex_result["id"]
else:
for title in mangadex_result["alternate_titles"]:
if ld(title.lower(), series_title.lower()) < 10:
return mangadex_result["id"]
return None
def mangadex_test():
md = MangadexAPI()
series = ka.get_all_series()
for serie in series:
# print(serie)
# complete=ka.get_complete_status(serie[1])
series_title = serie[0]
print("title", series_title)
series_id = serie[1]
mangadex_result = md.search_manga_id(series_title)
# print("resulting:",mangadex_result)
if mangadex_result is not None:
mangadex_id = determine_mangadex_id(mangadex_result, series_title)
print(mangadex_id)
lc.update_database(series_id, mangadex_id)
print("updated,sleeping to prevent ratelimit")
sleep(2)
# lc.populate_database()
# resutl=MangadexAPI().get_manga("77dbfa29-8ef0-446d-82cc-1b4de878dd90")
# print(resutl)
# md=MangadexAPI()
# print(md.search_manga_id("My Instant Death Ability Is So Overpowered, No One in This Other World Stands a Chance Against Me! —AΩ—"))
# # print(MangadexAPI().search_manga_id("My Instant Death Ability Is So Overpowered, No One in This Other World Stands a Chance Against Me! —AΩ—"))
mangadex_test()
string1 = "Karakai Jouzu no Takagi-san Doujin"
s2_1 = "Karakai Jouzu no Takagi-san"
string2 = "Karakai Jouzu no Takagi-san"
print(ld(string1, string2))
print(ld(string1, s2_1))
print(
determine_mangadex_id(
MangadexAPI().search_manga_id("Karakai Jouzu no Takagi-san"),
"Karakai Jouzu no Takagi-san",
)
)

11
src/data/Feeds/base.py Normal file
View File

@@ -0,0 +1,11 @@
import feedparser as fp
class Feed:
def __init__(self) -> None:
pass
def parse_feed(self,feed_url)->list:
try:
dataset = fp.parse(feed_url)
return dataset.entries
except:
return None

65
src/data/Feeds/nyaasi.py Normal file
View File

@@ -0,0 +1,65 @@
# Nyaa.si will be parsed using feedparser, as they provide a RSS feed.
from komsuite_nyaapy import Nyaa
from natsort import natsorted
import re
class NyaaFeed(Nyaa):
def __init__(self) -> None:
super().__init__()
def list_volumes(self, inpt) -> list[int]:
volumes = []
inpt = inpt[0].replace("v", "")
if "-" in inpt:
start, end = inpt.split("-")
for i in range(int(start), int(end) + 1):
volumes.append(i)
else:
volumes.append(int(inpt))
return natsorted(volumes)
def search(self, title: str):
dataset = super().search(title, 3, 1)
datalist = []
if dataset is None:
return datalist
for entry in dataset:
data = {}
# print(entry)
# if not "digital" in entry.title.lower():
# # print("Skipping {}".format(entry.title))
# # log.log("Skipping {}".format(entry.title))
# continue
data["title"] = entry.name
data["volumes"] = re.findall(regex, data["title"])
try:
match = re.match(
r"^(.*?)\s(vol\.\s\d{2})|(v\d{2,3})", data["title"].lower()
)
if match:
data["title"] = match.group(1)
except AttributeError:
# chapter check
try:
match = re.findall(r"(?<!\d)\d{2,3}(?!\d)", data["title"])
print("Matched chapter: {}".format(match))
print("Found Chapters only, skipping")
continue
except AttributeError:
pass
data["volumes"] = (
self.list_volumes(data["volumes"]) if data["volumes"] else [0]
)
data["link"] = entry.link
data["seeders"] = entry.nyaa_seeders
data["filesize"] = entry.nyaa_size
# print(data)
datalist.append(data)
return datalist
if __name__ == "__main__":
print("This is a module, not a script")

0
src/data/__init__.py Normal file
View File

109
src/data/apibase.py Normal file
View File

@@ -0,0 +1,109 @@
import time
import requests
class API:
def __init__(self, url: str, username: str = None, password: str = None):
self.url = url
self.auth = (username, password)
self.connected = self.test_connection()
def test_connection(self) -> bool:
tries = 10
delay = 1
for _ in range(tries):
try:
requests.get(self.url)
return True
except:
print(f"Connection failed, retrying in {delay} seconds")
time.sleep(delay)
delay *= 2
return False
def get(self, url, options: dict = None) -> requests.models.Response | None:
"""Get the response from the api.
Args:
----
- url (str): the part after the default api url (e.g. api.mangadex.org/*manga*)
- options (dict, optional): Options available by the API. Defaults to None.
Returns:
-------
- Response: the response from the api formatted as a json
- None: if the response is not 200
"""
url = f"{self.url}/{url}"
# print(url)
result = requests.get(url, auth=self.auth, params=options if options else None)
if not result.status_code == 200:
print(result.text)
return None
if "thumbnail" in url:
return result.content
return result.json()
def put(self, url, data: dict = None) -> requests.models.Response | None:
"""put the data to the api
Args:
url (str): the part after the default api url (e.g. api.mangadex.org/*manga*)
data (dict, optional): the data to be put. Defaults to None.
Returns:
Response: the response from the api formatted as a json
None: if the response is not 200
"""
url = f"{self.url}/{url}"
result = requests.put(url, auth=self.auth, json=data if data else None)
if not result.status_code == 200:
print(result.text)
return None
return result.json()
def patch(self, url, data: dict) -> requests.models.Response | None:
"""patch the data to the api
Args:
url (str): the part after the default api url (e.g. api.mangadex.org/*manga*)
data (dict): the data to be patched
Returns:
Response: the response from the api formatted as a json
None: if the response is not 200
"""
url = f"{self.url}/{url}"
result = requests.patch(url, auth=self.auth, json=data if data else None)
if not result.status_code == 200:
print(result.text)
return None
return result.json()
def post(self, url, data: dict = None) -> requests.models.Response | None:
"""post the data to the api
Args:
url (str): the part after the default api url (e.g. api.mangadex.org/*manga*)
data (dict): the data to be posted
Returns:
Response: the response from the api formatted as a json
None: if the response is not 200
"""
url = f"{self.url}/{url}"
result = requests.post(url, auth=self.auth, json=data if data else None)
if not result.status_code == 200:
print(result.text)
return None
return result.json()
if __name__ == "__main__":
print("This is a module, not a script")
ap = API("https://api.mangadex.org")
res = ap.get(
"/manga?title=My Instant Death Ability Is So Overpowered, No One in This Other World Stands a Chance Against Me! —AΩ—"
)
print(res)

0
src/data/base.py Normal file
View File

164
src/data/cache.py Normal file
View File

@@ -0,0 +1,164 @@
import os
import sqlite3 as sql
from src.data.komga import KomgaAPI
template = "name,komga_id,complete,mangadex,anilist,comicvine,myanimelist,kitsu,lastcheck"
template_cache = "name, komga_id,cover,metadata,provider"
ka = KomgaAPI()
class ListCache:
def __init__(self, db_name):
if not os.path.exists(db_name):
self.create_database(db_name, template)
self.con = sql.connect(db_name)
self.cursor = self.con.cursor()
def create_database(self, db_name, template):
print("Creating database")
con = sql.connect(db_name)
cursor = con.cursor()
cursor.execute(
f"CREATE TABLE IF NOT EXISTS cache (id INTEGER PRIMARY KEY AUTOINCREMENT, {template} TEXT, UNIQUE(komga_id))"
)
cursor.execute(
f"CREATE TABLE IF NOT EXISTS series (id INTEGER PRIMARY KEY AUTOINCREMENT, {template_cache} TEXT)"
)
con.close()
con.close()
return self
def populate_database(self):
print("Populating database")
for series in ka.getAllSeries():
print(series)
name = series.name.replace("'", "''") # escape single quotes
komga_id = series.id.replace("'", "''") # escape single quotes
status = series.metadata.status.replace("'", "''") # escape single quotes
self.cursor.execute(
f"INSERT INTO cache (name, komga_id, complete) VALUES ('{name}', '{komga_id}', '{status}')"
)
self.con.commit()
def update_database(
self,
komga_id,
id_type,
id
):
local_vars = locals()
print(local_vars)
if id_type in local_vars:
local_vars[id_type] = id
print(local_vars)
print(f"Updating database with {id_type} id {id}")
# check if the id is already in the database
query = f"SELECT * FROM cache WHERE {id_type} = '{id}'"
self.cursor.execute(query)
result = self.cursor.fetchone()
if result:
print(f"ID {id} already in database")
print("Updating database")
query = f"UPDATE cache SET {id_type} = '{id}' WHERE komga_id = '{komga_id}'"
print(query)
try:
self.cursor.execute(query)
self.con.commit()
except sql.OperationalError:
print(
"Error updating database, possibly due to missing data, repopulate the database"
)
self.populate_database()
return 0
else:
print(f"ID {id} not in database")
print("Adding ID to database")
query = f"INSERT INTO cache ({id_type}) VALUES ('{id}') WHERE komga_id = '{komga_id}'"
print(query)
try:
self.cursor.execute(query)
self.con.commit()
except sql.OperationalError:
print(
"Error updating database, possibly due to missing data, repopulate the database"
)
self.populate_database()
return 0
def get_cached_ids(self):
self.cursor.execute("SELECT name,provider FROM series")
return self.cursor.fetchall()
def add_cached_entry(self, name, komga_id, cover, metadata,provider):
self.cursor.execute(
"INSERT INTO series (name, komga_id, cover, metadata,provider) VALUES (?, ?, ?, ?,?)",
(name, komga_id, cover, metadata,provider),
)
self.con.commit()
def get_cached_entry(self, komga_id,provider)->tuple:
try:
self.cursor.execute(f"SELECT metadata FROM series WHERE komga_id='{komga_id}' AND provider='{provider}'")
return self.cursor.fetchone()
except:
return 0
def delete_cached_entries(self):
self.cursor.execute("DELETE FROM series")
def get_series_by_id(self, search_id: str, search_field: str) -> tuple:
"""Search the database for a series by id.
Args:
----
search_id (str): the id to search for
search_field (str): the field to search in. Must be one of the following: komga_id, mangadex_id, anilist_id, comicvine
Returns:
-------
tuple: a tuple containing the series data
"""
self.cursor.execute(f"SELECT * FROM cache WHERE {search_field}='{search_id}'")
return self.cursor.fetchone()
def get_all_series(self, return_data: str = "*") -> list[tuple]:
self.cursor.execute(f"SELECT {return_data} FROM cache")
return self.cursor.fetchall()
def add_series(self, name: str, komga_id: str, complete: str = "ONGOING") -> None:
self.cursor.execute(
"INSERT INTO cache (name, komga_id, complete) VALUES (?, ?, ?)",
(name, komga_id, complete),
)
self.con.commit()
def query(self, query: str) -> list[tuple]:
self.cursor.execute(query)
return self.cursor.fetchall()
def query_all_missing_id_type(
self, id_type: str
) -> list[tuple[int, str, str, str, str, str, str]]:
"""Queryl all entries in the database that do not have the requested id type.
Args:
----
id_type (str): The string describing the id type. Can be mangadex, comicvine or anilist
Returns:
-------
list[str]: The result of the query
"""
query = f"SELECT * FROM cache WHERE {id_type}_id IS NULL"
self.cursor.execute(query)
return self.cursor.fetchall()
if __name__ == "__main__":
from komga import KomgaAPI
lc = ListCache("mangacache.db")
# lc.populate_

108
src/data/comicsorg.py Normal file
View File

@@ -0,0 +1,108 @@
import json
import sqlite3 as sql
from src.schema.brand import BrandMetadata
from src.schema.country import CountryMetadata
from src.schema.language import LanguageMetadata
from src.schema.publicationData import PublicationTypeMetadata
from src.schema.series import GenericSeries
from src.schema.comicdata import ComicsORGdata
from src.schema.issue import IssueMetadata
from src.schema.publisher import PublisherMetadata
from typing import Optional, Union
config = json.load(open("config.json"))
class ComicsORGDB:
name = "ComicsORG"
def __init__(self) -> None:
self.conn = sql.connect(config["comics.org"]["location"])
self.cursor = self.conn.cursor()
def get_series(self, title: str) -> Optional[list[GenericSeries]]:
"""TODO: implement this
Here, a conversion from the ComicsORGdata object to a GenericSeries object should be done.
The series will be displayed in a new gui to select the correct series.
"""
ret_lst = []
series = self.get_series_data(title)
for serie in series:
ret = GenericSeries()
ret.series_id = serie.id
ret.provider = "comics.org"
ret.name = serie.name
ret.alternate_names = None
ret.sort_name = serie.sort_name
ret.releaseDate = serie.year_began
ret.publisher = serie.publisher.name
ret.people = None
ret.description = serie.notes
ret.language = serie.language.name
ret.issues = serie.issue_count
ret.links = None
ret.cover = None
ret.tags = None
ret_lst.append(ret)
return ret_lst if ret_lst != [] else None
def get_series_data(self, series_name: str) -> Union[list[ComicsORGdata], list]:
"""Return a ComicsORGdata object for the specified series."""
query = "SELECT * FROM gcd_series WHERE name =?"
params = (series_name,)
self.cursor.execute(query, params)
series_data = self.cursor.fetchall()
results = []
for series in series_data:
tmp = ComicsORGdata(*series)
tmp.assign(first_issue=self.get_issue(tmp.first_issue_id))
tmp.assign(last_issue=self.get_issue(tmp.last_issue_id))
tmp.assign(publisher=self.get_publisher(tmp.publisher_id))
tmp.assign(country=self.get_country(tmp.country_id))
tmp.assign(language=self.get_language(tmp.language_id))
tmp.assign(
publication_type=self.get_publication_type(tmp.publication_type_id)
)
results.append(tmp)
return results
def get_issue(self, first_issue_id: int) -> IssueMetadata:
"""Return a ComicsORGdata object for the first issue of the specified series."""
self.cursor.execute(
f"SELECT * FROM gcd_issue WHERE id = {first_issue_id} ORDER BY sort_code ASC LIMIT 1"
)
issue_data = self.cursor.fetchone()
return IssueMetadata(*issue_data)
def get_publisher(self, publisher_id: int) -> PublisherMetadata:
"""Return a ComicsORGdata object for the specified publisher."""
self.cursor.execute(f"SELECT * FROM gcd_publisher WHERE id = {publisher_id}")
publisher_data = self.cursor.fetchone()
return PublisherMetadata(*publisher_data)
def get_language(self, language_id: int) -> LanguageMetadata:
"""Return a ComicsORGdata object for the specified language."""
self.cursor.execute(f"SELECT * FROM stddata_language WHERE id = {language_id}")
language_data = self.cursor.fetchone()
return LanguageMetadata(*language_data)
def get_publication_type(self, publication_type_id: int) -> PublicationTypeMetadata:
"""Return a ComicsORGdata object for the specified publication type."""
if publication_type_id is None:
return None
self.cursor.execute(
f"SELECT * FROM gcd_series_publication_type WHERE id = {publication_type_id}"
)
publication_type_data = self.cursor.fetchone()
return PublicationTypeMetadata(*publication_type_data)
def get_country(self, country_id):
"""Return a ComicsORGdata object for the specified country."""
self.cursor.execute(f"SELECT * FROM stddata_country WHERE id = {country_id}")
country_data = self.cursor.fetchone()
return CountryMetadata(*country_data)

149
src/data/comicvine.py Normal file
View File

@@ -0,0 +1,149 @@
from src import __version__
from typing import Any, List
import json
import platform
import requests
from src.errors import *
from src.schema.series import GenericSeries
from src.schema.person import PersonData
from enum import Enum
from limit import limit
MINUTE = 60
config = json.load(open("config.json"))
url = config["comicvine"]["url"]
api_key = config["comicvine"]["api_key"]
class COMICDATA(Enum):
SERIES = (4000, "volumes", List[GenericSeries])
@property
def prefix(self) -> int:
return self.value[0]
@property
def endpoint(self) -> str:
return self.value[1]
@property
def schema(self) -> Any:
return self.value[2]
c = COMICDATA.SERIES.endpoint
class ComicVineAPI:
name = "ComicVine"
def __init__(self):
self.url = url
self.api_key = api_key
self.timeout = 30
self.headers = {
"Accept": "application/json",
"User-Agent": f"KomGrabber/{__version__}/{platform.system()}: {platform.release()}",
}
@limit(20, MINUTE)
def get_data(
self,
url: str,
params: dict[str:str] | None = {"api_key": api_key, "format": "json"},
) -> dict[str:Any]:
if params is None:
params = {}
try:
response = requests.get(
url, params=params, headers=self.headers, timeout=self.timeout
)
response.raise_for_status()
status_code = response.status_code
if status_code == 200:
results = response.json()
return results["results"]
except ConnectionError as e:
msg = f"Unable to get access to `{url}`"
raise ConnectError(msg) from e
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
msg = "Invalid API Key"
raise LoginError(msg) from e
if e.response.status_code == 404:
msg = "Unknown endpoint"
raise AccessError(msg) from e
if e.response.status_code == 502:
msg = "Service error, retry again in 30s"
raise AccessError(msg) from e
raise ResponseError(e.response.json()["error"])
except json.JSONDecodeError as e:
msg = f"Unable to parse response from `{url}` as Json"
raise JSONError(msg) from e
except requests.exceptions.ReadTimeout as e:
raise TimeoutError("Request timed out") from e
def get_series(
self, title: str = None, id: str = None
) -> GenericSeries | List[GenericSeries]:
url = f"{self.url}{COMICDATA.SERIES.endpoint}"
if id:
url = f"{url}/{COMICDATA.SERIES.prefix}-{id}"
if title:
url = f"{url}/?filter=name:{title}"
params = {"api_key": self.api_key, "format": "json"}
print(url)
data = self.get_data(url, params)
ret = self.parseData(data)
return ret
def get_cover(self, link: str) -> str:
"""Take the link to the cover image and return the image as a byte string
Args:
link (str): Link to the image, ususally found in ["image"]["original_url"]
Returns:
str: Bytestring of the image
"""
return requests.get(link).content
def getPeople(self, link) -> List[PersonData]:
data = self.get_data(link)
people = []
for person in data["person_credits"]:
name = person["name"]
role = person["role"]
if "," in role:
roles = role.split(",")
for r in roles:
people.append(PersonData(name=name, role=r))
else:
people.append(PersonData(name=name, role=role))
return people
def parseData(self, data: dict[str, Any]) -> List[GenericSeries]:
ret = []
for series in data:
seriesData = GenericSeries()
seriesData.series_id = series["id"]
seriesData.provider = "ComicVine"
seriesData.name = series["name"]
seriesData.alternate_names = (
series["aliases"] if series["aliases"] is not None else []
)
seriesData.sort_name = series["name"]
seriesData.releaseDate = series["start_year"]
seriesData.publisher = series["publisher"]["name"]
seriesData.people = self.getPeople(series["first_issue"]["api_detail_url"])
seriesData.description = series["description"]
seriesData.issues = series["count_of_issues"]
seriesData.cover = self.get_cover(series["image"]["original_url"])
seriesData.language = "en"
seriesData.links = [{"comicvine": series["site_detail_url"]}]
ret.append(seriesData)
return ret

32
src/data/constants.py Normal file
View File

@@ -0,0 +1,32 @@
_constants_link = {
"al": "https://anilist.co/manga/()",
"ap": "https://www.anime-planet.com/manga/()",
"md": "https://mangadex.org/title/()",
"kt": "https://kitsu.io/manga/()",
"bw": "https://bookwalker.jp/()",
"mal": "https://myanimelist.net/manga/()",
"mu": "https://www.mangaupdates.com/series.html?id=()",
"nu": "https://www.novelupdates.com/series/()",
}
_constants_label = {
"al": "Anilist",
"ap": "Anime Planet",
"bw": "BookWalker",
"kt": "Kitsu",
"mu": "MangaUpdates",
"nu": "NovelUpdates",
"amz": "Amazon",
"cdj": "CDJapan",
"mal": "MyAnimeList",
"raw": "Raw",
"engtl": "Official English Translation",
"md": "MangaDex",
"ebj": "EBookJapan",
}
LINKS = _constants_link
LABELS = _constants_label
VALID_ROLES = ["author", "artist", "letterer", "editor", "publisher"]

205
src/data/komga.py Normal file
View File

@@ -0,0 +1,205 @@
import json
import requests
from komgapi import komgapi as KOMGAPI_REST
from src.schema.series import SeriesMetadata
from src.logs.log import Log
from komconfig import KomConfig
config = KomConfig()
class KomgaAPI(KOMGAPI_REST):
def __init__(self) -> None:
self.logger = Log("KomgaAPI")
url = config.komga.url
self.auth = config.komga_auth
super().__init__(
url=url,
username=self.auth[0],
password=self.auth[1],
timeout=100,
)
self.connected = self.test_connection(url)
if not self.connected:
print("Komga API not connected")
exit(1)
def test_connection(self, url) -> bool:
try:
response = requests.get(f"{url}/api/v1/series", auth=self.auth)
if response.status_code == 200:
return True
return False
except Exception as e:
print(e)
return False
def getSeriesStatus(self, series_id) -> str:
"""Request the state of the given series.
Args:
- series_id (str): the id of the series
Returns:
str: the status of the series
"""
response = self.series_controller.getSeries(series_id=series_id)
self.logger.log_debug(response)
return response.metadata.status
def getCompleteStatus(self, series_id) -> bool:
"""Request the state of the given series.
Args:
- series_id (str): the id of the series
Returns:
bool: wheter the series is complete or not
"""
response = self.series_controller.getSeries(series_id=series_id)
self.logger.log_debug(response)
curr_books = response.booksCount
total_books = response.metadata.totalBookCount
if total_books == "null" or total_books is None:
total_books = 0
if int(curr_books) == int(total_books):
return True
return False
def _get_series_id(self, title) -> str:
result = self.series_controller.getAllSeries()
for series in result:
# print(series["name"])
if series.name == title:
series_id = series.id
return series_id
return None
def lastModified(self, title: str) -> str:
"""Get the last modified date of a series."""
api_url = f"api/v1/series/{self._get_series_id(title)}"
result = self.series_controller.getSeries(api_url)
return result.lastModified.split("T")[0]
def getVolumes(
self,
series_id: str,
media_status: list[str] = None,
read_status: list[str] = None,
tag: list[str] = None,
unpaged: bool = True,
page_integer: int = None,
size: int = None,
sort: list[str] = None,
author: list[str] = None,
) -> list[int]:
"""Get a list of all volumes matching the given criteria.
Args:
series_id (str, optional): The series_id determined by a previous api request. Defaults to None.
media_status (list[str], optional): A List of applicable options "UNKNOWN", "ERROR", "READY", "UNSUPPORTED", "OUTDATED". Defaults to None.
read_status (list[str], optional): Options: "UNREAD", "READ", "IN_PROGRESS". Defaults to None.
tag (list[str], optional): Any tag from the database. Defaults to None.
deleted (bool, optional): wether the media was deleted. Defaults to False.
unpaged (bool, optional): if the result should stop after a page (def. to 20 results). Defaults to True.
page_integer (int, optional): integer of the resulting page. Defaults to None.
size (int, optional): idk. Defaults to None.
sort (list[str], optional): sort using property(asc|desc). Defaults to None.
author (list[str], optional): name,role. Defaults to None.
Returns:
list[int]: _description_
"""
volumes = []
# api_url=f'{self.url}/api/v1/series/{self._get_series_id(title)}/books'
result = self.series_controller.getSeriesBooks(
series_id,
media_status=media_status,
read_status=read_status,
tag=tag,
unpaged=unpaged,
page=page_integer,
size=size,
sort=sort,
)
if result is None:
return volumes
# print(result) #! uncomment for debugging
for volume in result:
name = volume.name
if "#" in name:
vol = name.split("#")[-1]
if " " in vol: # in case something is added after the volume number
vol = vol.split(" ")[0]
if "-" in vol:
numba = vol.split("-")
m_numba = max(numba)
vol = m_numba
volumes.append(int(float(vol)))
return volumes
def getReadCount(self, series_id: str) -> int:
"""Get the number of read volumes of a series."""
result = self.series_controller.getSeries(series_id)
return result.booksReadCount
def getUnreadCount(self, series_id: str) -> int:
"""Get the number of unread volumes of a series."""
result = self.series_controller.getSeries(series_id)
return result.booksUnreadCount
def getInProgressCount(self, series_id: str) -> int:
"""Get the number of volumes in progress of a series."""
result = self.series_controller.getSeries(series_id)
return result.booksInProgressCount
def getBooksMetadata(self, series_id: str) -> list:
"""Get the metadata of all books in a series."""
result = self.series_controller.getSeries(series_id)
return result.booksMetadata
def updateMetadata(self, series_id: str, metadata: SeriesMetadata) -> None:
metadata_komga = {
"status": metadata.status,
"statusLock": False,
"title": metadata.series_name,
"titleLock": metadata.series_name_lock,
"titleSort": metadata.series_name_sort,
"titleSortLock": metadata.series_name_sort_lock,
"summary": metadata.description,
"summaryLock": metadata.description_lock,
"publisher": metadata.publisher,
"publisherLock": metadata.publisher_lock,
"readingDirection": metadata.reading_direction,
"readingDirectionLock": metadata.reading_direction_lock,
"ageRating": metadata.ageRating,
"ageRatingLock": metadata.ageRating_lock,
"language": metadata.language,
"languageLock": metadata.language_lock,
"genres": metadata.genres,
"genresLock": metadata.genres_lock,
"tags": metadata.tags,
"tagsLock": metadata.tags_lock,
"totalBookCountLock": metadata.total_volumes_lock,
"linksLock": metadata.links_lock,
"alterateTitleLock": metadata.alternate_names_lock,
"links": metadata.links,
"alternateTitles": metadata.alternate_names,
"totalBookCount": metadata.total_volumes,
}
api_url = f"api/v1/series/{series_id}/metadata"
self.series_controller.patchMetadata(api_url, metadata_komga)
pass
if __name__ == "__main__":
from data import SeriesMetadata
api = KomgaAPI()
api.getSeriesBooks("095S763VH28SQ")
# print(api.get_all_series())

323
src/data/mangadex.py Normal file
View File

@@ -0,0 +1,323 @@
from src.data.apibase import API as _API
from src.data import constants
from src.schema.series import GenericSeries, SeriesMetadata
from src.schema.person import PersonData
import requests
# from jellyfish import jaro_similarity
#
# import thread signal emitter
class MangadexAPI(_API):
name = "MangaDex"
def __init__(self, username: str = None, password: str = None):
self.url = "https://api.mangadex.org/"
super().__init__(self.url, username, password)
def get_alternate_titles(self, titles: list[dict]) -> list:
"""Get the alternate titles from the mangadex api.
Args:
titles (list[dict]): a list of dictionaries containing the alternate titles
Returns:
list:a list of alternate titles
"""
titles = [title for title in titles if title.keys() == {"en"}]
lst = []
for title in titles:
lst.append(title["en"])
return lst
def search_manga_id(self, title: str, options: dict = None) -> dict:
"""Search for a manga id using the title.
Args:
title (str): the title of the manga
"""
title = title.replace(" ", "%20").replace("&", "%26")
data = self.get(f"/manga?title={title}&order[relevance]=desc", options=options)
result_list = []
# print(type(data))
# print(len(data["data"]))
for manga in data["data"]:
cleaned_data = {}
if options is not None:
for key in options.keys():
cleaned_data[key] = [key]
return_title = (
manga["attributes"]["title"]["en"]
if "en" in manga["attributes"]["title"].keys()
else None
)
alternative_titles = self.get_alternate_titles(
manga["attributes"]["altTitles"]
)
mangadex_id = manga["id"]
cleaned_data = {
"title": return_title,
"alternate_titles": alternative_titles,
"id": mangadex_id,
}
result_list.append(cleaned_data)
# print(manga, type(manga))
# cleaned_data["id"]=manga[0]["id"]
# if dict.keys(manga[0]["attributes"]["title"])=={"en"}:
# cleaned_data["title"]=manga[0]["attributes"]["title"]["en"]
if len(result_list) > 0:
return result_list
# return cleaned_data
def get_manga_id(self, title: str, options: dict = None) -> dict:
titles = self.search_manga_id(title=title, options=options)
if titles is not None:
for title in titles:
print(title)
return titles
def get_series(self, title) -> list[GenericSeries]:
"""Search the MangaDex API using the title and returl all assumed matches for the user to select from.
Args:
----
- title (str): The title of the series to search for.
Returns:
-------
- list[GenericSeries]: A list of GenericSeries objects containing the series information.
"""
def __publisher(links: dict) -> str:
if "engtl" in links.keys():
link = links["engtl"]
link = link.replace("www.", "")
publisher = link.split("//")[1].split(".")[0]
return publisher
else:
return (
links["raw"].split("//")[1].split(".")[0]
if "raw" in links.keys()
else None
)
def __people(relationships: list[dict]) -> list[PersonData]:
ret = []
for p in relationships:
if p["type"] in constants.VALID_ROLES:
ret.append(self.get_author(p["id"], p["type"]))
return ret
def __cover(relationships: list[dict]) -> str:
for r in relationships:
if r["type"] == "cover_art":
return r["id"]
return None
def __issues(status, lastVolume, lastChapter) -> str:
if status != "completed":
return status
elif lastVolume != "" and lastChapter != "":
return f"{lastVolume} ({lastChapter})"
else:
return lastVolume
def __get_tags(tags: list[dict], type) -> list:
ret = []
for t in tags:
if t["attributes"]["group"] == type:
ret.append(t["attributes"]["name"]["en"])
return ret
def __links(links: dict) -> list[dict]:
def __is_valid_link(link: str) -> bool:
if link.startswith("http") or link.startswith("https"):
return True
else:
return False
link_list = {}
for key in links.keys():
if __is_valid_link(links[key]):
link_list[key] = links[key]
else:
link_list[key] = constants.LINKS[key].replace("()", f"{links[key]}")
return link_list
response = self.get(f"manga?title={title}")
data = response["data"]
ret = []
for entry in data:
series_name = (
entry["attributes"]["title"]
if "en" or "ja" in entry["attributes"]["title"].keys()
else "No Eng or Jap title found"
)
gs = GenericSeries()
gs.series_id = entry["id"]
gs.provider = "Mangadex"
# set gs.name to value of key in series_name if it is a dict
gs.name = (
list(series_name.items())[0][1]
if isinstance(series_name, dict)
else series_name
)
gs.alternate_names = entry["attributes"]["altTitles"]
gs.sort_name = (
list(series_name.items())[0][1]
if isinstance(series_name, dict)
else None
)
gs.releaseDate = entry["attributes"]["year"]
gs.publisher = __publisher(entry["attributes"]["links"])
gs.people = __people(entry["relationships"])
gs.description = (
entry["attributes"]["description"]["en"]
if "en" in entry["attributes"]["description"].keys()
else None
)
gs.cover = self.get_cover(gs.series_id, __cover(entry["relationships"]))
gs.language = entry["attributes"]["originalLanguage"]
gs.issues = __issues(
status=entry["attributes"]["status"],
lastVolume=entry["attributes"]["lastVolume"],
lastChapter=entry["attributes"]["lastChapter"],
)
gs.links = __links(entry["attributes"]["links"])
gs.tags = __get_tags(entry["attributes"]["tags"], "theme")
gs.tags.append(entry["attributes"]["publicationDemographic"])
gs.genres = __get_tags(entry["attributes"]["tags"], "genre")
gs.rating = entry["attributes"]["contentRating"]
ret.append(gs)
return ret
def get_author(self, author_id: str, role: str = None) -> PersonData:
data = self.get(f"author/{author_id}")
pd = PersonData()
pd.name = data["data"]["attributes"]["name"]
pd.role = role if role is not None else data["data"]["attributes"]["role"]
return pd
def get_series_by_id(self, id: str) -> GenericSeries:
data = self.get(f"manga/{id}")
series_name = (
data["attributes"]["title"]
if "en" or "ja" in data["attributes"]["title"].keys()
else "No Eng or Jap title found"
)
gs = GenericSeries()
gs.series_id = data["id"]
gs.provider = "Mangadex"
# set gs.name to value of key in series_name if it is a dict
gs.name = (
list(series_name.items())[0][1]
if isinstance(series_name, dict)
else series_name
)
return gs
def get_metadata(self, id: str, lang: str) -> SeriesMetadata: #!Deprecate ?
def __create_links(links: dict) -> list[dict]:
def __is_valid_link(link: str) -> bool:
if link.startswith("http") or link.startswith("https"):
return True
else:
return False
link_list = []
for key in links.keys():
link_struct = {"label": None, "url": None}
if __is_valid_link(links[key]):
link_struct["label"] = constants.LABELS[key]
link_struct["url"] = links[key]
link_list.append(link_struct)
else:
link_struct["label"] = constants.LABELS[key]
link_struct["url"] = constants.LINKS[key].replace(
"()", f"{links[key]}"
)
link_list.append(link_struct)
return link_list
def __get_genres(genres: list[dict]) -> list:
genre_list = []
for t in genres:
if t["attributes"]["group"] == "genre":
genre_list.append(t["attributes"]["name"]["en"])
return genre_list
def __get_tags(tags: list[dict]) -> list:
tag_list = []
for t in tags:
if t["attributes"]["group"] != "genre":
tag_list.append(t["attributes"]["name"]["en"])
return tag_list
def __get_people(ppl: list[dict]) -> list[dict]:
VALID_ROLES = ["author", "artist", "letterer", "editor", "publisher"]
def __get_author(author_id: str) -> str:
data = self.get(f"author/{author_id}")
return data["data"]["attributes"]["name"]
ppl_list = []
for p in ppl:
if p["type"] not in VALID_ROLES:
continue
struct = {"name": None, "role": None}
struct["name"] = __get_author(p["id"])
struct["role"] = p["type"]
ppl_list.append(struct)
return ppl_list
data = self.get(f"manga/{id}")
# print(data)
# print("-------------------")
metadata = SeriesMetadata()
metadata.alternate_names = self.get_alternate_titles(
data["data"]["attributes"]["altTitles"]
)
metadata.series_type = data["data"]["type"]
metadata.description = (
data["data"]["attributes"]["description"][lang]
if lang in data["data"]["attributes"]["description"].keys()
else None
)
metadata.links = __create_links(data["data"]["attributes"]["links"])
metadata.status = data["data"]["attributes"]["status"]
metadata.genres = __get_genres(data["data"]["attributes"]["tags"])
metadata.tags = __get_tags(data["data"]["attributes"]["tags"])
metadata.authors = __get_people(data["data"]["relationships"])
return metadata
def get_cover(self, series_id: str, cover_id: str):
def __filename(cover_id: str):
result = self.get(f"cover/{cover_id}")
if result is not None:
return result["data"]["attributes"]["fileName"]
else:
return "ParseError"
url = "https://mangadex.org/covers/{}/{}".format(
series_id, __filename(cover_id)
)
if "ParseError" in url:
return None
ret = requests.get(url)
if ret.status_code == 200:
return ret.content
# return ret
# result = self.get(f"cover/{manga_id}")
# if result.status_code == 200:
# print(result.json())
if __name__ == "__main__":
md = MangadexAPI()
md.get_metadata("77dbfa29-8ef0-446d-82cc-1b4de878dd90")

0
src/data/metroncloud.py Normal file
View File

18
src/data/ttest.py Normal file
View File

@@ -0,0 +1,18 @@
import requests
from urllib.parse import urljoin
url = "http://comicvine.gamespot.com/api/"
params = { # CV uses volume to mean series
"api_key": "0d87c5060d8f5f8e5b7f153b367b8b7596be46f8",
"format": "json",
"resources": "volume",
"query": "86--EIGHTY-SIX",
"field_list": "volume,name,id,start_year,publisher,image,description,count_of_issues,aliases",
"page": 1,
"limit": 100,
}
qurl = urljoin(url, "search")
print(qurl)
data = requests.get(qurl, params=params,headers={"Accept": "application/json"})
print(data.content)

52
src/download.py Normal file
View File

@@ -0,0 +1,52 @@
import requests, os, sys
import threading
from aria2p import Client, API
class Download:
def __init__(self, settings:dict)->None:
self.settings = settings
self.download_queue = []
self.download_lock = threading.Lock()
self.download_thread = threading.Thread(target=self.download_worker)
self.download_thread.start()
self.aria2_running=self.check_aria2()
self.api = API(
client=Client(
host="http://localhost",
port=6800,
secret="",
timeout=60,
)
)
self.api.set_global_options({"dir": self.settings})
if not self.aria2_running:
print("Aria2 is not running")
sys.exit()
def check_aria2(self):
#check if aria2 is running
if os.system("ps -A | grep aria2c") == 0:
return True
else:
return False
def add_download(self, url:str, )->None:
self.download_lock.acquire()
self.download_queue.append(url)
self.download_lock.release()
def download_worker(self)->None:
while True:
if len(self.download_queue) > 0:
self.download_lock.acquire()
url, path = self.download_queue.pop(0)
self.download_lock.release()
self.download(url, path)
else:
pass
def download(self, url:str, path:str)->None:
#aria2 torrent download
if url.endswith(".torrent"):
self.api.add_torrent(url)

23
src/errors/__init__.py Normal file
View File

@@ -0,0 +1,23 @@
class ConnectError(Exception):
def __init__(self, message):
super().__init__(message)
class LoginError(Exception):
def __init__(self, message):
super().__init__(message)
class AccessError(Exception):
def __init__(self, message):
super().__init__(message)
class JSONError(Exception):
def __init__(self, message):
super().__init__(message)
class ResponseError(Exception):
def __init__(self, message):
super().__init__(message)
class ResultError(Exception):
def __init__(self, message):
super().__init__(message)

546
src/logic/cli.py Normal file
View File

@@ -0,0 +1,546 @@
import os
import re
import shutil
import subprocess
import time
import zipfile
import jaro
from src.data.komga import KomgaAPI
from komgapi.schemas.Series import Series
from src.data.mangadex import MangadexAPI
from src.data.cache import ListCache
from src.data.Feeds.nyaasi import Nyaa
from src.logic.download import Download
from komconfig import KomConfig
import loguru
import sys
config = KomConfig()
logs = loguru.logger
logs.remove()
logs.add("komgrabber.log", level="INFO")
Komga = KomgaAPI()
md = MangadexAPI()
LINE_CLEAR = "\x1b[2K" # <-- ANSI sequence
failed_items = []
class utils:
def __init__(self) -> None:
self.dl = Download("/home/alexander/Downloads/torrents/Manga_test/")
self.file = None
self.serie = ""
self.serie_id = ""
self.series_data: Series = None
self.volumes = []
self.download_path = config.komgrabber.download_location
if "~" in self.download_path:
self.download_path = os.path.expanduser(self.download_path)
# self.allSeries = Komga.getAllSeries()
pass
def download(self, feed_url: str):
def __chapter_check(title: str) -> bool:
if title.endswith(".cbz") or title.endswith(".cbr"):
if not re.search(r"(v\d{1,3}(-\d{1,3})?)|(Vol\. \d{1,3})", title):
return True
else:
return False
def __epub_check(title: str) -> bool:
if title.endswith(".epub"):
return True
else:
return False
file: str
file = self.dl.get_file(feed_url)
if __chapter_check(file):
print(f"Skipping {file}, reason: no volume number, likely a chapter")
return False
if __epub_check(file):
print(f"Skipping {file}, reason: epub file")
return False
self.file = file
print(f"Filename: {file}")
file_move = False
if file.endswith(".cbz") or file.endswith(".cbr"):
new_folder = f"{self.download_path}{self.serie}"
os.makedirs(new_folder, exist_ok=True)
file_move = True
state = self.dl.add_torrent(feed_url.split("/")[-1])
if state is False:
print("Error adding torrent")
return False
gid = self.dl.api.get_downloads()[0].gid
# check if the download is complete usin the gid
dl_complete = True
check_done = False
while not self.dl.api.get_downloads(gids=[gid])[0].seeder:
# while not self.dl.api.get_downloads()[0].seeder:
progress = self.dl.check_progress()
progress = "{:.2f}".format(progress)
eta = self.dl.api.get_downloads()[0].eta_string()
print(end=LINE_CLEAR)
print("Progress: ", progress, "ETA: ", eta, end="\r")
# if progress remains the same for 30 seconds, stop the download
progress = self.dl.check_progress()
time.sleep(30)
n_progress = self.dl.check_progress()
if not check_done:
local_files = os.listdir(f"{self.download_path}")
for f in local_files:
# print(f)
if os.path.isdir(f"{self.download_path}/{f}"):
local_files.extend(
[
f"{self.download_path}/{f}/{file}"
for file in os.listdir(f"{self.download_path}/{f}")
]
)
local_files = [
file
for file in local_files
if file.endswith(".cbz") or file.endswith(".cbr")
]
local_volumes = Komga.getVolumes(self.series_data.id)
# if not local_files:
# dl_complete=False
# break
local_files_volumes = []
for file in local_files:
vol_regex = r"(v\d{1,3}(-\d{1,3})?)|(Vol\. \d{1,3})"
# if the file does not match the naming convention, skip it
if re.search(vol_regex, file):
match = re.search(vol_regex, file)
if match:
vol = match.group(0).replace("v", "").replace("Vol. ", "")
if "-" in vol:
local_files_volumes.extend(
[int(volume) for volume in vol.split("-")]
)
continue
vol = int(vol)
local_files_volumes.append(vol)
print(f"Grabbed volumes: {local_files_volumes}")
print(f"Komga volumes: {local_volumes}")
if local_files_volumes == []:
pass
# check íf any local_file_volumes are not in local_volumes
if all([vol in local_volumes for vol in local_files_volumes]):
print("all volumes downloaded, stopping...")
dl_complete = False
break
else:
print("not all volumes downloaded, continuing...")
check_done = True
if progress == n_progress:
print("Progress has not changed for 30 seconds, stopping the download")
self.dl.api.get_downloads()[0].remove(force=True)
dl_complete = False
break
else:
pass
# stop the download, remove the torrent files
try:
self.dl.api.get_downloads()[0].remove(force=True)
except:
pass
self.dl.remove_torrents()
print(end=LINE_CLEAR)
print("Download complete")
# self.dl.download(feed_url, file_rename=True)
if not dl_complete:
# remove everything from the download folder
data = os.listdir(f"{self.download_path}")
for file in data:
try:
os.remove(f"{self.download_path}{file}")
except IsADirectoryError:
shutil.rmtree(f"{self.download_path}{file}")
if dl_complete is True:
# for dfile in os.listdir(f'{self.download_path}{file}'):
# if __chapter_check(dfile):
# os.remove(f'{self.download_path}{file}{dfile}')
try:
if file_move is True:
shutil.move(
f"{self.download_path}{file}",
f"{new_folder}/{file}",
)
except Exception as e:
print(e)
return False
return True
return False
def tag_files(self, folder: str, interactive: bool = False):
"""Tag all files in the specified folder.
Args:
----
- folder (str): the path to the folder containing the files to tag
- interactive (bool, optional): if set to True, the shell will pause and await user input instead of not writing data to file. Defaults to False.
"""
def is_valid_cbz(file_path) -> bool:
try:
with zipfile.ZipFile(file_path, "r") as cbz_file:
# Check if the file is a valid ZIP archive
if cbz_file.testzip() is not None:
return False
# Check if the CBZ file contains at least one image file
for file_info in cbz_file.infolist():
if (
not file_info.is_dir()
and file_info.filename.lower().endswith(
(".jpg", ".jpeg", ".png")
)
):
return True
return False
except (zipfile.BadZipFile, FileNotFoundError):
return False
for file in os.listdir(f"{folder}"):
print(f"Checking {file}")
# if file is a not cbz file, skip
if not file.endswith(".cbz"):
print(f"Skipping {file}")
continue
try:
# if not is_valid_cbz(f"{folder}/{file}"):
# print(f"removing {file}, not a valid cbz file")
# os.remove(f"{folder}/{file}")
# continue
print(f"Tagging {file}")
regex = r"v(\d{2,3}) #(\d{2,3})"
match = re.search(regex, file)
if not match:
print(f"Skipping {file}, no match")
os.remove(f"{folder}/{file}")
continue
if interactive:
subprocess.call(
f'comictagger -s -t cr -f -o "{folder}/{file}" --nosummary --overwrite -i',
shell=True,
)
subprocess.call(
f'comictagger -s -t cr -f -o "{folder}/{file}" --nosummary --overwrite',
shell=True,
)
print(f"Tagged {file}")
except Exception as e:
print(e)
continue
def rename_folder_and_files(self, file: str, komga_data, remove=False):
logs.info(f"Renaming {file}")
# rename the folder to the komga name
series_id = komga_data.id
series_name = komga_data.name
new_folder = f"{self.download_path}{series_name}"
try:
os.rename(f"{self.download_path}{file}", new_folder)
except Exception as e:
print(e)
try:
files = os.listdir(new_folder)
except NotADirectoryError:
return
volumes = []
for file in files:
if not (file.endswith(".cbz") or file.endswith(".cbr")):
print(f"Skipping {file}, not a comicarchive file")
continue
ext = file.split(".")[-1]
# match = re.search(r"v\d{2,4}(-\d{2,4})*", file)
match = re.search(r"v\d{2,4} ", file)
if match:
# print(match)
split_start = match.start()
split_end = match.end()
# Split the filename between split_start and split_end
volume = file[split_start:split_end]
# Split the filename at the split index, but keep the "v" and digits in the title
title = file[:split_start].strip()
# add the volume number to the title as a suffix #nr
title = f"{title} {volume} #{volume.replace('v', '')}".strip().replace(
" ", " "
)
# print(title)
# rename the file
os.rename(f"{new_folder}/{file}", f"{new_folder}/{title}.{ext}")
volumes.append(int(volume.replace("v", "")))
logs.info(f"Renamed {file} to {title}")
if remove:
print("removing files that are already in komga")
# search komga_name in series
# get all volumes of the serie
local_volumes = Komga.getVolumes(series_id=series_id)
# remove the volumes that are already in komga
self.remove_if_alr_in_db(local_volumes, volumes, series_name)
self.tag_files(new_folder)
def process_serie(self, data: Series):
"""Pprocess a single serie based on its title.
The process is as follows:
1. get all volumes of the serie from komga using the api
2. get all feed entries from nyaa.si using the api
3. compare the volumes from komga with the volumes from nyaa.si
3.1 if the volumes from nyaa.si are greater than the volumes from komga, add the entry to the download list.
Args:
----
- data (dict): a dict containing the title of the serie at ["title"] and the id of the serie at ["id"]
Returns:
-------
- list[dict]: a list of dictionaries containing the entries to download
"""
serie = data.name
series_id = data.id
vols = Komga.getVolumes(series_id=series_id, unpaged=True)
feed_titles = Nyaa.search(keyword=serie, category=3, subcategory=1)
print(feed_titles)
f_d = []
if feed_titles == []:
failed_items.append(serie)
added_max_vols = vols if vols else [0]
# #print(len(added_max_vols))
for entry in feed_titles:
if entry.seeders > 0:
if (
serie.lower() in entry.name.lower()
or jaro.jaro_metric(entry.name.lower(), serie.lower()) > 0.7
):
volumes = entry["volumes"]
if isinstance(volumes, list):
volumes = volumes[
::-1
] # reverse the list to get the highest volume number quickly
for vol in volumes:
if vol not in added_max_vols:
f_d.append(entry)
added_max_vols.append(vol)
break
return f_d
def media_grabber(self, serie: Series):
result = self.process_serie(serie)
logs.info(f"Found {len(result)} new volumes for {serie.name}")
logs.info(f"Data: {result}")
print(
f"current volumes: {Komga.getVolumes(series_id=serie.id, unpaged=True)}, new volumes: {result}"
)
# print(result)
if len(result) != 0:
for entry in result:
# print(entry["link"])
if self.download(entry["link"]) is True:
print("renaming...")
self.rename_folder_and_files(
self.file, komga_data=serie, remove=True
)
# self.move_to_komga(serie=entry)
print("done")
return True
else:
# remove the folder
try:
folders = os.listdir(self.download_path)
for folder in folders:
os.remove(f"{self.download_path}{folder}")
except Exception as e:
print(e)
return False
def remove_if_alr_in_db(
self, present_volumes: list, downloaded_volumes: list, folder: str
):
"""Delete any file from the folder that is already in the database, or does not conform to the naming convention.
Args:
----
present_volumes (list): a list of volumes that are already in the database, retrieved from komga api
downloaded_volumes (list): the list of volumes that are downloaded from the corresponding feed/api
folder (str): relative path to the folder containing the downloaded files
"""
print(f"present_volumes: {present_volumes}")
print(f"downloaded_volumes: {downloaded_volumes}")
content_folder = f"{self.download_path}{folder}"
content_files = [file for file in os.listdir(content_folder)]
print(f"content_files: {content_files}")
duplicates = [any(file in content_files for file in present_volumes)]
for file in os.listdir(content_folder):
if "#" not in file:
try:
os.remove(os.path.join(content_folder, file))
if IsADirectoryError:
shutil.rmtree(os.path.join(content_folder, file))
if FileNotFoundError:
continue
except Exception as e:
# print(e)
continue
# print(f"removed {file}, Reason: not a valid file")
content_files.remove(file)
for vol in present_volumes:
if vol < 10:
vol = f"0{vol}"
for file in content_files:
if str(vol) in file:
# print(f"removing {vol}")
try:
os.remove(os.path.join(content_folder, file))
except:
print(f"could not remove {vol}")
def move_to_komga(self, serie: tuple[str, str] = None):
komga_path = f"{config.komga.media_path}{self.serie}"
# print(f"komga_path: {komga_path}")
# print("moving to komga")
# move files to komga
for file in os.listdir(f"{self.download_path}{self.serie}"):
file_path = os.path.join(f"{self.download_path}{self.serie}", file)
final_path = os.path.join(komga_path, file)
shutil.move(file_path, final_path)
print(f"moved {file} to {komga_path}")
# delete empty folder
try:
os.rmdir(f"{self.download_path}{self.serie}")
logs.info(f"moved {self.serie} to komga")
except:
print(f"could not remove {self.serie}")
logs.error(f"could not remove {self.serie}")
return self
def search_for_new_volumes(self):
series = Komga.series_controller.getAllSeries(
body={
"condition": {
"seriesStatus": {
"operator": "is",
"value": "HIATUS",
"value": "ENDED",
}
}
}
)
shutil.rmtree(self.download_path)
os.mkdir(self.download_path)
for serie in series:
position = series.index(serie)
print("Working on serie", position, "of ", len(series))
logs.info(f"searching for new volumes for {serie.name}")
print(serie.name)
self.series_data = serie
self.serie = serie.name
self.serie_id = serie.id
if self.media_grabber(serie) is True:
self.move_to_komga(serie)
time.sleep(5)
# print("done", serie.name)
return self
def add_missing_to_db(self):
database_series = ListCache("mangacache.db").get_all_series("name")
database_series = [serie[0] for serie in database_series]
database_set = set(database_series)
# print(database_series)
komga_series = Komga.series_controller.getAllSeries()
db_added = []
for serie in komga_series:
if serie.id not in database_set:
# print(serie.id)
db_added.append(serie)
ListCache("mangacache.db").add_series(
serie.id, serie.name, serie.metadata.status
)
else:
print(f"{serie.id} already in db")
print("added to db:", len(db_added))
# print(f"{serie[1]} has status {komga_series}")
@DeprecationWarning
def get_md_metadata(self, id: str):
data = md.get_metadata(id, lang="en")
db_data = ListCache("mangacache.db").get_series_by_id(id, "mangadex_id")
def automated(self, series_data: tuple[str, str]):
"""_summary_.
Args:
----
series_data (list[tuple[str,str]]): _description_
"""
if self.media_grabber(series_data) is True:
self.move_to_komga(series_data)
time.sleep(5)
def parallel_execution(series: list[tuple[str, str, str]]):
"""_summary_.
Args:
----
series (list[tuple[str,str,str]]): _description_
"""
th = utils()
for serie in series:
th.automated(serie)
@DeprecationWarning
def update_state():
database_series = ListCache("mangacache.db").get_all_series()
database_series = [serie for serie in database_series if serie[3] != "ENDED"]
for serie in database_series:
komga_series = Komga.getSeriesStatus(serie)
if komga_series == "ONGOING":
continue
else:
ListCache("mangacache.db").update_database(
komga_id=serie[2], complete=komga_series
)
def avail_check():
komga_avail = True
return (True, komga_avail)
def main():
utils().search_for_new_volumes()
# update_state()
print("Failed series:\n", failed_items)
if __name__ == "__main__":
utils().search_for_new_volumes()

231
src/logic/constants.py Normal file
View File

@@ -0,0 +1,231 @@
LINK_TRANSFORM = {"al":"https://anilist.co/manga/",
"ap":"https://www.anime-planet.com/manga/",
"bw":"https://bookwalker.jp/",
"kt":"https://kitsu.io/manga/",
"mu":"https://www.mangaupdates.com/series.html?id=",
"mal":"https://myanimelist.net/manga/"}
LANG_CODES = [
"ab",
"aa",
"af",
"ak",
"sq",
"am",
"ar",
"an",
"hy",
"as",
"av",
"ae",
"ay",
"az",
"bm",
"ba",
"eu",
"be",
"bn",
"bi",
"bs",
"br",
"bg",
"my",
"ca",
"ch",
"ce",
"ny",
"zh",
"cu",
"cv",
"kw",
"co",
"cr",
"hr",
"cs",
"da",
"dv",
"nl",
"dz",
"en",
"eo",
"et",
"ee",
"fo",
"fj",
"fi",
"fr",
"fy",
"ff",
"gd",
"gl",
"lg",
"ka",
"de",
"el",
"kl",
"gn",
"gu",
"ht",
"ha",
"he",
"hz",
"hi",
"ho",
"hu",
"is",
"io",
"ig",
"id",
"ia",
"ie",
"iu",
"ik",
"ga",
"it",
"ja",
"jv",
"kn",
"kr",
"ks",
"kk",
"km",
"ki",
"rw",
"ky",
"kv",
"kg",
"ko",
"kj",
"ku",
"lo",
"la",
"lv",
"li",
"ln",
"lt",
"lu",
"lb",
"mk",
"mg",
"ms",
"ml",
"mt",
"gv",
"mi",
"mr",
"mh",
"mn",
"na",
"nv",
"nd",
"nr",
"ng",
"ne",
"no",
"nb",
"nn",
"ii",
"oc",
"oj",
"or",
"om",
"os",
"pi",
"ps",
"fa",
"pl",
"pt",
"pa",
"qu",
"ro",
"rm",
"rn",
"ru",
"se",
"sm",
"sg",
"sa",
"sc",
"sr",
"sn",
"sd",
"si",
"sk",
"sl",
"so",
"st",
"es",
"su",
"sw",
"ss",
"sv",
"tl",
"ty",
"tg",
"ta",
"tt",
"te",
"th",
"bo",
"ti",
"to",
"ts",
"tn",
"tr",
"tk",
"tw",
"ug",
"uk",
"ur",
"uz",
"ve",
"vi",
"vo",
"wa",
"cy",
"wo",
"xh",
"yi",
"yo",
"za",
"zu",
]
READING_DIRECTIONS = ["Left to Right", "Right to Left", "Vertical", "Webtoon"]
READING_DIRECTONS_KOMGA = ["LEFT_TO_RIGHT", "RIGHT_TO_LEFT", "VERTICAL", "WEBTOON"]
READING_DIR_TRANSLATION = {
READING_DIRECTIONS[0]: READING_DIRECTONS_KOMGA[0],
READING_DIRECTIONS[1]: READING_DIRECTONS_KOMGA[1],
READING_DIRECTIONS[2]: READING_DIRECTONS_KOMGA[2],
READING_DIRECTIONS[3]: READING_DIRECTONS_KOMGA[3],
}
METADATA_PROVIDERS = ["MangaDex", "ComicVine", "AniList", "MyAnimeList", "Comics.org"]
SERIES_STATUS = ["---", "Ongoing", "Ended", "Hiatus", "Abandoned"]
SERIES_STATUS_KOMGA = ["UNKNOWN", "ONGOING", "ENDED", "HIATUS", "ABANDONED"]
SERIES_STATUS_TRANSLATION = {
SERIES_STATUS[0]: SERIES_STATUS_KOMGA[0],
SERIES_STATUS[1]: SERIES_STATUS_KOMGA[1],
SERIES_STATUS[2]: SERIES_STATUS_KOMGA[2],
SERIES_STATUS[3]: SERIES_STATUS_KOMGA[3],
SERIES_STATUS[4]: SERIES_STATUS_KOMGA[4],
}
def translate_series_status(status: str) -> str:
if status in SERIES_STATUS_TRANSLATION.keys():
return SERIES_STATUS_TRANSLATION[status]
else:
#get the key from the value
for key, value in SERIES_STATUS_TRANSLATION.items():
if value == status:
return key
def translate_reading_direction(direction: str) -> str:
if direction in READING_DIR_TRANSLATION.keys():
return READING_DIR_TRANSLATION[direction]
else:
#get the key from the value
for key, value in READING_DIR_TRANSLATION.items():
if value == direction:
return key

0
src/logic/data.py Normal file
View File

View File

@@ -0,0 +1,16 @@
import os
import re
def detect_chapters(src: str = "/home/alexander/Downloads/torrents/manga/"):
for folder in os.listdir(src):
if os.path.isdir(f"{src}/{folder}"):
files = os.listdir(f"{src}/{folder}")
for file in files:
# check for regex "v(d) #(d)" in the file name
regex = re.compile(r"^.* v(\d+) #(\d+(?:-\d+)?)\.cbz$")
if regex.search(file):
print(f"File {file} is a Volume")
else:
print(f"Deleting chapter {file}")
os.remove(f"{src}/{folder}/{file}")

99
src/logic/download.py Normal file
View File

@@ -0,0 +1,99 @@
import sys
import os
import time
import bencodepy
from .rename import rename
from aria2p import Client
from aria2p import API
class Download:
""" Download a file from a url and start the download using aria2"""
def __init__(self, download_location) -> None:
self.download_location=download_location
self.filename=None
self.torrent_file=None
self.progress=0
self.canceled=False
self.aria2_running=self.check_aria2()
self.api = API(
client=Client(
host="http://localhost",
port=6800,
secret="",
timeout=60,
)
)
self.api.set_global_options({"dir": self.download_location})
if not self.aria2_running:
print("Aria2 is not running")
sys.exit()
def check_aria2(self):
#check if aria2 is running
if os.system("ps -A | grep aria2c") == 0:
return True
else:
return False
def check_progress(self):
try:
current_progress=self.api.get_downloads()[0].progress
except:
return self.progress+0.01
if current_progress > self.progress:
self.progress=current_progress
return current_progress
def get_file(self, url, series_name=None):
#get the file name from the url
#use wget to download the file to the download location
name=url.split('/')[-1]
dl_url=f'{self.download_location}{name}'
while self.get_filename(dl_url) is None:
if not os.path.exists(dl_url):
os.system(f'wget -P {self.download_location} {url}')
filename = self.get_filename(dl_url)
self.torrent_file=url.split('/')[-1]
self.filename=filename
return filename
def remove_torrents(self):
tr_files=[file for file in os.listdir(self.download_location) if ".torrent" in file]
for file in tr_files:
os.remove(f'{self.download_location}{file}')
def add_torrent(self, torr_name):
try:
self.api.add_torrent(f'{self.download_location}{torr_name}')
print("Torrent added")
except Exception as e:
print(f"Error adding torrent: {e}")
return False
def rename_download(self):
filename=self.filename.replace(".aria2", "")
foldername=filename.replace(".cbz", "") if ".cbz" in filename else filename
print(f'Filename: {filename}')
print(f'Foldername: {foldername}')
if not os.path.exists(f'{self.download_location}{foldername}'):
os.mkdir(f'{self.download_location}{foldername}')
os.rename(f'{self.download_location}{filename}', f'{self.download_location}{foldername}/{filename}')
#rename the file
rename(f'{self.download_location}{foldername}')
def get_filename(self, torrent_file):
try:
with open(torrent_file, 'rb') as f:
torrent = bencodepy.decode(f.read())
#self.filename=torrent[b'info'][b'name'].decode('utf-8')
return torrent[b'info'][b'name'].decode('utf-8')
except FileNotFoundError:
return None

53
src/logic/manual.py Normal file
View File

@@ -0,0 +1,53 @@
import os
import re
def _vol_list() -> list[int]:
path = "/home/alexander/Downloads/torrents/Manga_test/"
# get all files in the dir and subdirs
files = os.listdir(path)
for f in files:
print(f)
if os.path.isdir(f"{path}/{f}"):
print("is dir")
files.extend([f"{path}/{f}/{file}" for file in os.listdir(f"{path}/{f}")])
return files
def isdircheck(path: str) -> bool:
if os.path.isdir(path):
return True
else:
return False
def __chapter_check(title: str) -> bool:
if title.endswith(".cbz") or title.endswith(".cbr"):
if not re.search(r"(v\d{1,3}(-\d{1,3})?)|(Vol\. \d{1,3})", title):
return True
else:
return False
def check_folder(folder):
files = os.listdir(folder)
for file in files:
if os.path.isdir(f"{folder}/{file}"):
print(f"{file} is a dir")
check_folder(f"{folder}/{file}")
else:
print(f"{file} is a file")
if __chapter_check(file):
print(f"{file} is a chapter")
os.remove(f"{folder}/{file}")
else:
print(f"{file} is a volume")
if __name__ == "__main__":
# # print(__chapter_check("Even Given the Worthless 'Appraiser' Class, I'm Actually the Strongest v01-09+073 (2023) (Digital) (danke-Empire).cbz"))
# __vol_list()
# # print(isdircheck("/home/alexander/Downloads/torrents/Manga_test/Peter Grill and the Philosopher's Time (Digital)/"))
check_folder("/home/alexander/Downloads/torrents/Manga_test/")

42
src/logic/move.py Normal file
View File

@@ -0,0 +1,42 @@
import os
import shutil
def move(src, dest: str = "/mnt/Media/Manga"):
"""Move the files in a folder to another folder.
Args:
----
- dest (str): the string to the destination folder
"""
# Get the files in the folder
# +move the folders from src to disc, if folder already exists, only move new files
folders = os.listdir(src)
for folder in folders:
if not os.path.exists(f"{dest}/{folder}"):
print(f"Moving {folder} to {dest}")
shutil.move(f"{src}/{folder}", dest)
else:
files = os.listdir(f"{src}/{folder}")
for file in files:
if not os.path.exists(f"{dest}/{folder}/{file}"):
print(f"Moving {file} to {dest}/{folder}")
shutil.move(f"{src}/{folder}/{file}", f"{dest}/{folder}")
# Remove empty folders
remove_empty_folders(src)
def remove_empty_folders(src):
"""Remove empty folders from a directory.
Args:
----
- src (str): the string to the source folder
"""
folders = os.listdir(src)
for folder in folders:
if os.path.isfile(f"{src}/{folder}"):
continue
if not os.listdir(f"{src}/{folder}"):
print(f"Removing {folder}")
os.rmdir(f"{src}/{folder}")
else:
remove_empty_folders(f"{src}/{folder}")

45
src/logic/move_test.py Normal file
View File

@@ -0,0 +1,45 @@
import os
import re
folder_path = "/home/alexander/Downloads/torrents/Manga_test/"
def rename(folder):
"""Rename the files in a folder according to the template.
Template: [Name] v[nr] #[nr].ext (e.g. "The Flash v1 #1.cbz").
Args:
----
- folder (str): the string to the folder
"""
# Get the files in the folder
files = os.listdir(folder)
for file in files:
if not file.endswith(".cbz"):
print(f"Skipping {file}, not a cbz file")
continue
ext = file.split(".")[-1]
match = re.search(r"v\d{2,4} ", file)
if match:
print(match)
split_start = match.start()
split_end = match.end()
# Split the filename between split_start and split_end
volume = file[split_start:split_end]
# Split the filename at the split index, but keep the "v" and digits in the title
title = file[:split_start].strip()
# add the volume number to the title as a suffix #nr
title = f"{title} {volume} #{volume.replace('v','')}"
print(title)
# rename the file
os.rename(f"{folder}/{file}", f"{folder}/{title}.{ext}")
for folder in os.listdir(folder_path):
if os.path.isdir(f"{folder_path}/{folder}"):
rename(f"{folder_path}/{folder}")
print(f"Renamed {folder}")
else:
print(f"{folder} is not a folder")
continue

7
src/logic/pickles.py Normal file
View File

@@ -0,0 +1,7 @@
import pickle
def make_pickle(obj):
return pickle.dumps(obj)
def load_pickle(pickled_obj):
return pickle.loads(pickled_obj)

51
src/logic/rename.py Normal file
View File

@@ -0,0 +1,51 @@
# Rename the downloaded files according to the template
# Template: [Name] v[nr] #[nr].ext (e.g. "The Flash v1 #1.cbz")
import os
import re
def rename(folder: str = "/home/alexander/Downloads/torrents/manga/") -> None:
"""Rename the files in a folder according to the template.
Template: [Name] v[nr] #[nr].ext (e.g. "The Flash v1 #1.cbz").
Args:
----
- folder (str): the string to the folder
"""
# Get the files in the folder
files = os.listdir(folder)
print(files)
for file in files:
if os.path.isdir(f"{folder}/{file}"):
rename(f"{folder}/{file}")
if not file.endswith(".cbz"):
print(f"Skipping {file}, not a cbz file")
continue
ext = file.split(".")[-1]
match = re.search(r"v\d{2,4}", file)
if match:
split_start = match.start()
split_end = match.end()
# Split the filename between split_start and split_end
volume = file[split_start:split_end]
# Split the filename at the split index, but keep the "v" and digits in the title
title = file[:split_start].strip()
# add the volume number to the title as a suffix #nr
title = f"{title} {volume} #{volume.replace('v', '')}"
print(title)
# rename the file
os.rename(f"{folder}/{file}", f"{folder}/{title}.{ext}")
# rename the folder
def rename_recursive(folder: str) -> None:
# get all directories in the folder and apply the rename function to them
for root, dirs, _files in os.walk(folder):
for dir in dirs:
rename(f"{root}/{dir}")
if __name__ == "__main__":
rename_recursive("/home/alexander/Downloads/torrents/manga/")

27
src/logic/tag.py Normal file
View File

@@ -0,0 +1,27 @@
import os
from pathlib import Path
import subprocess
def tag_folder(
folder: Path = Path("/home/alexander/Downloads/torrents/manga/"),
) -> None:
"""Tag the files in a folder according to the template.
Template: [Name] v[nr] #[nr].ext (e.g. "The Flash v1 #1.cbz").
Args:
----
- folder (Path): the string to the folder
"""
# Get the files in the folder
files = os.listdir(folder)
for file in files:
if os.path.isdir(f"{folder}/{file}"):
tag_folder(f"{folder}/{file}")
if not file.endswith(".cbz"):
continue
print("Trying to tag file", file)
subprocess.call(
f'comictagger -s -t cr -f -o "{folder}/{file}" --nosummary --overwrite -i',
shell=True,
)

190
src/logic/testing.py Normal file
View File

@@ -0,0 +1,190 @@
import json
import os
import re
# import a library to open zip files
import zipfile
from jellyfish import jaro_similarity
from APIs import KomgaAPI, MangadexAPI, NyaaFeed
from APIs.cache import ListCache
Komga = KomgaAPI()
config = json.load(open("config.json"))
cat = NyaaFeed()
# with open("compare1.json") as f:
# data = json.load(f)
# with open("compare2.json") as f:
# data2 = json.load(f)
def compare(data1, data2):
# compare the two data sets and return a list of differences
differences = []
for key in data1:
if key in data2:
if data1[key] != data2[key]:
differences.append(key)
else:
differences.append(key)
return differences
# diffs=compare(data, data2)
# #get the differences from the first data set
# for diff in diffs:
# print(diff, data[diff])
# #get the differences from the second data set
# for diff in diffs:
# print(diff, data2[diff])
def check_presence_of_xml_file(filename: str):
with zipfile.ZipFile(filename, "r") as zip_ref:
return "ComicInfo.xml" in zip_ref.namelist()
def create_xml_file(filename: str):
with zipfile.ZipFile(filename, "r") as zip_ref:
return zip_ref.read("ComicInfo.xml")
def rename_files(komga_data: str):
"""Rename the files in the folder to the komga name with the volume number.
Args:
----
file (str): pth to the folder
komga_data (str): series name
"""
# rename the folder to the komga name
new_folder = f'{config["download_location"]}/{komga_data}'
try:
files = os.listdir(new_folder)
except FileNotFoundError:
return
for file in files:
if not (file.endswith(".cbz") or file.endswith(".cbr")):
print(f"Skipping {file}, not a cbz file")
continue
ext = file.split(".")[-1]
match = re.search(r"v\d{2,4}(-\d{2,4})*", file)
if match:
print(match.group(0))
vol = match.group(0).replace("v", "")
title = file.split(match.group(0))[0]
title = title.lstrip().rstrip()
new_filename = f"{title} v{vol} #{vol}.{ext}"
print(new_filename)
os.rename(f"{new_folder}/{file}", f"{new_folder}/{new_filename}")
# try:
# os.rename(f"{new_folder}/{file}", f"{new_folder}/{filename} v{vol} #{vol}.{ext}")
# except FileNotFoundError:
# print(f"File not found: {file}")
# split_index = match.start()
# #split the title after the split_index
# title=f"{file[:split_index]}{file[:split_index+len(match.group(0)):]}"
# print(title)
# volumes = match.group(0).split("-")
# volumes = [volume.replace("v", "") for volume in volumes]
# volume_data="-".join(volumes)
# # print(volume)
# # volumes.append(int(volume))
# # #add the volume number to the title as a suffix #nr
# title=f"{title} #{volume_data}"
# #rename the file
# os.rename(f"{new_folder}/{file}", f"{new_folder}/{title}.{ext}")
def __chapter_check(title: str) -> bool:
if title.endswith(".cbz") or title.endswith(".cbr"):
if not re.search(r"(v\d{1,3}(-\d{1,3})?)|(Vol\. \d{1,3})", title):
return True
else:
return False
def check_folder(folder):
files = os.listdir(folder)
for file in files:
if os.path.isdir(f"{folder}/{file}"):
print(f"{file} is a dir")
check_folder(f"{folder}/{file}")
else:
print(f"{file} is a file")
if __chapter_check(file):
print(f"{file} is a chapter")
os.remove(f"{folder}/{file}")
else:
print(f"{file} is a volume")
def add_ids():
def __determine_similarity(search_string: str, given_string: str) -> float:
return jaro_similarity(search_string, given_string)
database = ListCache("mangacache.db")
Ma = MangadexAPI()
result = database.query_all_missing_id_type("mangadex")
print(len(result))
max_sim = 0
manga_id = None
for series in result:
title = series[1]
mangadex_id = Ma.get_manga_id(title)
if type(mangadex_id) == tuple:
print("result is a tuple")
similarity = __determine_similarity(mangadex_id["title"], title)
if similarity > max_sim:
max_sim = similarity
manga_id = mangadex_id["id"]
for alt_title in mangadex_id["alternate_titles"]:
similarity = __determine_similarity(alt_title, title)
if similarity > max_sim:
max_sim = similarity
manga_id = mangadex_id["id"]
# print(mangadex_id)
elif type(mangadex_id) == list:
print("result is a list")
# print(mangadex_id)
for res_title in mangadex_id:
similarity = __determine_similarity(res_title["title"], title)
if similarity > max_sim:
max_sim = similarity
manga_id = res_title["id"]
for alt_title in res_title["alternate_titles"]:
similarity = __determine_similarity(alt_title, title)
if similarity > max_sim:
max_sim = similarity
manga_id = res_title["id"]
else:
print(mangadex_id)
print(manga_id)
if __name__ == "__main__":
# series_names=Komga.get_all_series()
# for series in series_names:
# print(series[0])
# rename_files(series[0])
folders = os.listdir(config["download_location"])
for folder in folders:
print(folder)
check_folder(f'{config["download_location"]}/{folder}')
rename_files(f'{config["download_location"]}/{folder}')
# rename_files(komga_data="Hell's Paradise - Jigokuraku")
# add_ids()

28
src/logic/threads.py Normal file
View File

@@ -0,0 +1,28 @@
from gui import SeriesSelectDialog
from PySide6 import QtWidgets, QtCore, QtGui
#import thread capabilties
from PySide6.QtCore import QThread, Signal
class SeriesThread(QThread):
"""Thread to get the series from the api.
"""
#signal to send the series data to the main thread
series = Signal(list)
def __init__(self, api, series_name) -> None:
super().__init__()
self.api = api
self.series_name = series_name
def run(self):
#display the SeriesSelectDialog
print("running thread")
dialog = QtWidgets.QDialog()
ui = SeriesSelectDialog(self.series_name, self.api)
ui.setupUi(dialog)
dialog.show()
dialog.exec()
#send the data back to the main thread
self.series.emit(ui.data)

3
src/logs/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
__all__ = ["Log"]
from .log import Log

23
src/logs/debug.py Normal file
View File

@@ -0,0 +1,23 @@
import logging
from icecream import ic
from .log import Log
log = Log("debug")
def debugMessage(message, debugLevel, *args, **kwargs):
ic(message)
if debugLevel == logging.DEBUG:
log.log_debug(message)
elif debugLevel == logging.INFO:
log.log_info(message)
elif debugLevel == logging.WARNING:
log.log_warning(message)
elif debugLevel == logging.ERROR:
log.log_error(message)
elif debugLevel == logging.CRITICAL:
log.log_critical(message)
else:
log.log_info(message)
ic("Invalid debug level, defaulting to INFO")
log.log_warning("Invalid debug level, defaulting to INFO")

54
src/logs/log.py Normal file
View File

@@ -0,0 +1,54 @@
import inspect
import logging
import os
if not os.path.exists("logs"):
os.mkdir("logs")
# Create a common file handler for all loggers
common_file_handler = logging.FileHandler("logs/application.log")
common_file_handler.setLevel(logging.DEBUG)
# Include function name in the formatter
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
common_file_handler.setFormatter(formatter)
class Log:
def __init__(self, logger_name):
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(common_file_handler)
def log_info(self, message):
caller = inspect.stack()[1][3]
self.logger.info(f"{caller} - {message}")
def log_debug(self, message):
caller = inspect.stack()[1][3]
self.logger.debug(f"{caller} - {message}")
def log_warning(self, message):
caller = inspect.stack()[1][3]
self.logger.warning(f"{caller} - {message}")
def log_error(self, message):
caller = inspect.stack()[1][3]
self.logger.error(f"{caller} - {message}")
def log_critical(self, message):
caller = inspect.stack()[1][3]
self.logger.critical(f"{caller} - {message}")
def log_exception(self, message):
caller = inspect.stack()[1][3]
self.logger.exception(f"{caller} - {message}")
# Usage example:
if __name__ == "__main__":
logger1 = Log("Logger1")
logger2 = Log("Logger2")
logger1.log_info("This is an info message from Logger1")
logger1.log_debug("This is a debug message from Logger1")

0
src/schema/__init__.py Normal file
View File

4
src/schema/brand.py Normal file
View File

@@ -0,0 +1,4 @@
from dataclasses import dataclass
@dataclass
class BrandMetadata:
pass

71
src/schema/comicdata.py Normal file
View File

@@ -0,0 +1,71 @@
import copy
from dataclasses import dataclass
from typing import Any
from .issue import IssueMetadata
from .publisher import PublisherMetadata
from .country import CountryMetadata
from .language import LanguageMetadata
from .publicationData import PublicationTypeMetadata
@dataclass
class ComicsORGdata:
"""Metadata for a series retrieved from the comics.org database.
"""
id: int | None = None
name: str | None = None
sort_name: str | None = None
format: str | None = None
year_began: int | None = None
year_began_uncertain: bool | None = None
year_ended: int | None = None
year_ended_uncertain: bool | None = None
publication_dates: str | None = None
first_issue_id: int | None = None
last_issue_id: int | None = None
is_current: bool | None = None
publisher_id: int | None = None # based on id request from table
country_id: int | None = None # based on id request from table
language_id: int | None = None # based on id request from table
tracking_notes: str | None = None
notes: str | None = None
has_gallery: bool | None = None
issue_count: int | None = None
created: str | None = None
modified: str | None = None
deleted: str | None = None
has_indicia_frequency: bool | None = None
has_isbn: bool | None = None
has_barcode: bool | None = None
has_issue_title: bool | None = None
has_volume: bool | None = None
is_comics_publication: bool | None = None
color: bool | None = None
dimensions: str | None = None
paper_stock: str | None = None
binding: str | None = None
publishing_format: str | None = None
has_rating: bool | None = None
publication_type_id: int | None = None # based on id request from table
is_singleton: bool | None = None
has_about_comics: bool | None = None
has_indicia_printer: bool | None = None
has_publisher_code_number: bool | None = None
first_issue: IssueMetadata | None =None # based on id request from table
last_issue: IssueMetadata | None =None # based on id request from table
publisher: PublisherMetadata | None =None
country: CountryMetadata | None =None
language: LanguageMetadata | None =None
publication_type: PublicationTypeMetadata | None =None
def copy(self)->"ComicsORGdata":
return copy.deepcopy(self)
def replace(self, **kwargs:Any) -> "ComicsORGdata":
"""Return a new ComicsORGdata object replacing specified fields with new values.
"""
tmp = self.copy()
tmp.__dict__.update(kwargs)
return tmp
def assign(self, **kwargs:Any) -> None:
"""Assign new values to specified fields.
"""
self.__dict__.update(kwargs)
return self

7
src/schema/country.py Normal file
View File

@@ -0,0 +1,7 @@
from dataclasses import dataclass
@dataclass
class CountryMetadata:
id: int | None = None
code: str | None = None
name: str | None = None

47
src/schema/issue.py Normal file
View File

@@ -0,0 +1,47 @@
from dataclasses import dataclass
@dataclass
class IssueMetadata:
id: int | None = None
number: int | None = None
volume: int | None = None
no_volume: int | None = None
display_volume_with_number: bool | None = None
series_id: int | None = None
indicia_publisher_id: int | None = None
indicia_pub_not_printed: bool | None = None
brand_id: int | None = None
no_brand: bool | None = None
publication_date: str | None = None
key_date: str | None = None
sort_code: str | None = None
price: str | None = None
page_count: int | None = None
page_count_uncertain: bool | None = None
indicia_frequency: str | None = None
no_indicia_frequency: bool | None = None
editing: str | None = None
no_editing: bool | None = None
notes: str | None = None
created: str | None = None
modified: str | None = None
deleted: str | None = None
is_indexed: bool | None = None
isbn: str | None = None
valid_isbn: bool | None = None
no_isbn: bool | None = None
variant_of_id: int | None = None
variant_name: str | None = None
barcode: str | None = None
no_barcode: bool | None = None
title: str | None = None
no_title: bool | None = None
on_sale_date: str | None = None
on_sale_date_uncertain: bool | None = None
rating: str | None = None
no_rating: bool | None = None
volume_not_printed: bool | None = None
no_indicia_printer: bool | None = None
variant_cover_status: str | None = None

9
src/schema/language.py Normal file
View File

@@ -0,0 +1,9 @@
from dataclasses import dataclass
@dataclass
class LanguageMetadata:
id: int | None = None
code: str | None = None
name: str | None = None
native_name: str | None = None

14
src/schema/person.py Normal file
View File

@@ -0,0 +1,14 @@
from dataclasses import dataclass
@dataclass
class PersonData:
"""Metadata for a person.
"""
name: str | None = None
role: str | None = None
@property
def __dict__(self):
return {"name": self.name, "role": self.role}

View File

@@ -0,0 +1,7 @@
from dataclasses import dataclass
@dataclass
class PublicationTypeMetadata:
id: int | None = None
name: str | None = None
notes: str | None = None

23
src/schema/publisher.py Normal file
View File

@@ -0,0 +1,23 @@
from dataclasses import dataclass
@dataclass
class PublisherMetadata:
id: int | None = None
name: str | None = None
country_id: int | None = None
year_began: int | None = None
year_ended: int | None = None
notes: str | None = None
url: str | None = None
brand_count: int | None = None
indicia_publisher_count: int | None = None
series_count: int | None = None
created: str | None = None
modified: str | None = None
issue_count: int | None = None
deleted: str | None = None
year_began_uncertain: bool | None = None
year_ended_uncertain: bool | None = None
year_overall_began: int | None = None
year_overall_uncertain: int | None = None
year_overall_ended: int | None = None
year_overall_ended_uncertain: bool | None = None

132
src/schema/series.py Normal file
View File

@@ -0,0 +1,132 @@
from dataclasses import dataclass
import dataclasses
from src.schema.person import PersonData
from komgapi.schemas import Metadata
from typing import Any
import copy
@dataclass
class GenericSeries:
series_id: int | None = None
provider: str | None = None
name: str | None = None
alternate_names: list[dict[str, str]] = dataclasses.field(default_factory=list)
sort_name: str | None = None
releaseDate: str | None = None
publisher: str | None = None
people: list[PersonData] | None = None
description: str | None = None
cover: bytes | None = None
language: str | None = None
issues: str | None = None
links: list[dict[str, str]] = dataclasses.field(default_factory=list)
tags: list[str] | None = None
genres: list[str] | None = None
rating: str | None = None
@property
def copy(self) -> "GenericSeries":
return copy.deepcopy(self)
def replace(self, **kwargs: Any) -> "GenericSeries":
"""Return a new GenericSeries object replacing specified fields with new values."""
tmp = self.copy
tmp.__dict__.update(kwargs)
return tmp
@dataclass
class SeriesMetadata:
"""Metadata for a series."""
is_empty: bool = True
series_name: str | None = None
series_name_lock: bool = False
series_name_sort: str | None = None
series_name_sort_lock: bool = False
series_type: str | None = None
alternate_names: list[str] | None = None
alternate_names_lock: bool = False
publisher: str | None = None
publisher_lock: bool = False
description: str | None = None
description_lock: bool = False
tags: list[str] | None = None
tags_lock: bool = False
genres: list[str] | None = None
genres_lock: bool = False
links: list[dict[str, str]] = dataclasses.field(default_factory=list)
links_lock: bool = False
current_volumes: int | None = None
status: str | None = None
total_volumes: int | None = None
total_volumes_lock: bool = False
releaseDate: str | None = None
ageRating: str | None = None
ageRating_lock: bool = False
# authors: list[AuthorData] = dataclasses.field(default_factory=list)
authors: list[dict[str, str]] | None = None
language: str | None = None
language_lock: bool = False
reading_direction: str | None = None
reading_direction_lock: bool = False
cover: str | None = None
@property
def copy(self) -> "SeriesMetadata":
return copy.deepcopy(self)
def replace(self, **kwargs: Any) -> "SeriesMetadata":
"""Return a new SeriesMetadata object replacing specified fields with new values."""
tmp = self.copy
tmp.__dict__.update(kwargs)
return tmp
def overwrite(self, changed_metadata: "SeriesMetadata") -> None:
raise NotImplementedError
def add_data(metadata: "SeriesMetadata", field_name: str, data: str):
# Check if the field name is valid
field_names = [field.name for field in dataclasses.fields(metadata)]
if field_name not in field_names:
print(f"Error: {field_name} is not a valid field")
return
# Set the value of the field to the data
setattr(metadata, field_name, data)
def from_Metadata(self, metadata: Metadata):
self.is_empty
self.series_name
self.series_name_lock
self.series_name_sort
self.series_name_sort_lock
self.series_type
self.alternate_names
self.alternate_names_lock
self.publisher
self.publisher_lock
self.description
self.description_lock
self.tags
self.tags_lock
self.genres
self.genres_lock
self.links
self.links_lock
self.current_volumes
self.status
self.total_volumes
self.total_volumes_lock
self.releaseDate
self.ageRating
self.ageRating_lock
# authors: list[AuthorData] = dataclasses.field(default_factory=list)
self.authors
self.language
self.language_lock
self.reading_direction
self.reading_direction_lock
self.cover

260
src/schema/tmp Normal file
View File

@@ -0,0 +1,260 @@
import copy
import dataclasses
from dataclasses import dataclass
from typing import Any, TypedDict
import ast
@dataclass
class GenericSeries:
series_id: int | None = None
provider: str | None = None
name: str | None = None
alternate_names: list = dataclasses.field(default_factory=list)
sort_name: str | None = None
releaseDate: str | None = None
publisher: str | None = None
people : list[PersonData] | None = None
description: str | None = None
cover: bytes | None = None
language: str | None = None
issues : str | None = None
links: list[dict[str, str]] = dataclasses.field(default_factory=list)
tags: list[str] | None = None
genres : list[str] | None = None
rating: str | None = None
def copy(self)->"GenericSeries":
return copy.deepcopy(self)
def replace(self, **kwargs:Any) -> "GenericSeries":
"""Return a new GenericSeries object replacing specified fields with new values.
"""
tmp = self.copy()
tmp.__dict__.update(kwargs)
return tmp
@dataclass
class SeriesMetadata:
"""Metadata for a series.
"""
is_empty: bool = True
series_name: str | None = None
series_name_lock: bool = False
series_name_sort: str | None = None
series_name_sort_lock: bool = False
series_type: str | None = None
alternate_names: list[str] | None = None
alternate_names_lock: bool = False
publisher: str | None = None
publisher_lock: bool = False
description: str | None = None
description_lock: bool = False
tags: list[str] | None = None
tags_lock: bool = False
genres: list[str] | None = None
genres_lock: bool = False
links: list[dict[str, str]] = dataclasses.field(default_factory=list)
links_lock: bool = False
current_volumes: int | None = None
status: str | None = None
total_volumes: int | None = None
total_volumes_lock: bool = False
releaseDate: str | None = None
ageRating: str | None = None
ageRating_lock: bool = False
#authors: list[AuthorData] = dataclasses.field(default_factory=list)
authors: list[dict[str, str]] | None = None
language: str | None = None
language_lock: bool = False
reading_direction: str | None = None
reading_direction_lock: bool = False
cover: str | None = None
def copy(self)->"SeriesMetadata":
return copy.deepcopy(self)
def replace(self, **kwargs:Any) -> "SeriesMetadata":
"""Return a new SeriesMetadata object replacing specified fields with new values.
"""
tmp = self.copy()
tmp.__dict__.update(kwargs)
return tmp
def overwrite(self, changed_metadata: "SeriesMetadata") -> None:
raise NotImplementedError
def add_data(metadata: "SeriesMetadata", field_name: str, data: str):
# Check if the field name is valid
field_names = [field.name for field in dataclasses.fields(metadata)]
if field_name not in field_names:
print(f'Error: {field_name} is not a valid field')
return
# Set the value of the field to the data
setattr(metadata, field_name, data)
@dataclass
class IssueMetadata:
id: int | None = None
number: int | None = None
volume: int | None = None
no_volume: int | None = None
display_volume_with_number: bool | None = None
series_id: int | None = None
indicia_publisher_id: int | None = None
indicia_pub_not_printed: bool | None = None
brand_id: int | None = None
no_brand: bool | None = None
publication_date: str | None = None
key_date: str | None = None
sort_code: str | None = None
price: str | None = None
page_count: int | None = None
page_count_uncertain: bool | None = None
indicia_frequency: str | None = None
no_indicia_frequency: bool | None = None
editing: str | None = None
no_editing: bool | None = None
notes: str | None = None
created: str | None = None
modified: str | None = None
deleted: str | None = None
is_indexed: bool | None = None
isbn: str | None = None
valid_isbn: bool | None = None
no_isbn: bool | None = None
variant_of_id: int | None = None
variant_name: str | None = None
barcode: str | None = None
no_barcode: bool | None = None
title: str | None = None
no_title: bool | None = None
on_sale_date: str | None = None
on_sale_date_uncertain: bool | None = None
rating: str | None = None
no_rating: bool | None = None
volume_not_printed: bool | None = None
no_indicia_printer: bool | None = None
variant_cover_status: str | None = None
# def return_
#TODO: add the rest of the fields
@dataclass
class BrandMetadata:
pass
@dataclass
class PublisherMetadata:
id: int | None = None
name: str | None = None
country_id: int | None = None
year_began: int | None = None
year_ended: int | None = None
notes: str | None = None
url: str | None = None
brand_count: int | None = None
indicia_publisher_count: int | None = None
series_count: int | None = None
created: str | None = None
modified: str | None = None
issue_count: int | None = None
deleted: str | None = None
year_began_uncertain: bool | None = None
year_ended_uncertain: bool | None = None
year_overall_began: int | None = None
year_overall_uncertain: int | None = None
year_overall_ended: int | None = None
year_overall_ended_uncertain: bool | None = None
@dataclass
class CountryMetadata:
id: int | None = None
code: str | None = None
name: str | None = None
@dataclass
class LanguageMetadata:
id: int | None = None
code: str | None = None
name: str | None = None
native_name: str | None = None
@dataclass
class PublicationTypeMetadata:
id: int | None = None
name: str | None = None
notes: str | None = None
@dataclass
class ComicsORGdata:
"""Metadata for a series retrieved from the comics.org database.
"""
id: int | None = None
name: str | None = None
sort_name: str | None = None
format: str | None = None
year_began: int | None = None
year_began_uncertain: bool | None = None
year_ended: int | None = None
year_ended_uncertain: bool | None = None
publication_dates: str | None = None
first_issue_id: int | None = None
last_issue_id: int | None = None
is_current: bool | None = None
publisher_id: int | None = None # based on id request from table
country_id: int | None = None # based on id request from table
language_id: int | None = None # based on id request from table
tracking_notes: str | None = None
notes: str | None = None
has_gallery: bool | None = None
issue_count: int | None = None
created: str | None = None
modified: str | None = None
deleted: str | None = None
has_indicia_frequency: bool | None = None
has_isbn: bool | None = None
has_barcode: bool | None = None
has_issue_title: bool | None = None
has_volume: bool | None = None
is_comics_publication: bool | None = None
color: bool | None = None
dimensions: str | None = None
paper_stock: str | None = None
binding: str | None = None
publishing_format: str | None = None
has_rating: bool | None = None
publication_type_id: int | None = None # based on id request from table
is_singleton: bool | None = None
has_about_comics: bool | None = None
has_indicia_printer: bool | None = None
has_publisher_code_number: bool | None = None
first_issue: IssueMetadata | None =None # based on id request from table
last_issue: IssueMetadata | None =None # based on id request from table
publisher: PublisherMetadata | None =None
country: CountryMetadata | None =None
language: LanguageMetadata | None =None
publication_type: PublicationTypeMetadata | None =None
def copy(self)->"ComicsORGdata":
return copy.deepcopy(self)
def replace(self, **kwargs:Any) -> "ComicsORGdata":
"""Return a new ComicsORGdata object replacing specified fields with new values.
"""
tmp = self.copy()
tmp.__dict__.update(kwargs)
return tmp
def assign(self, **kwargs:Any) -> None:
"""Assign new values to specified fields.
"""
self.__dict__.update(kwargs)
return self

56
src/test.ipynb Normal file
View File

@@ -0,0 +1,56 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2\n"
]
}
],
"source": [
"numba = \"01-02\"\n",
"if \"-\" in numba:\n",
" numba = numba.split(\"-\")\n",
" m_numba = max(numba)\n",
" numba = m_numba\n",
"\n",
"print(int(float(numba)))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.3"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

66
src/test.py Normal file
View File

@@ -0,0 +1,66 @@
# import json
# import os
# import re
# from APIs import KomgaAPI, MangadexAPI, NyaaFeed
# from logs import Log
# config = json.load(open("config.json"))
# Nyaa = NyaaFeed()
# Komga = KomgaAPI()
# md = MangadexAPI()
# def remove_if_alr_in_db(
# self, present_volumes: list, downloaded_volumes: list, folder: str
# ):
# """Delete any file from the folder that is already in the database, or does not conform to the naming convention.
# Args:
# ----
# present_volumes (list): a list of volumes that are already in the database, retrieved from komga api
# downloaded_volumes (list): the list of volumes that are downloaded from the corresponding feed/api
# folder (str): relative path to the folder containing the downloaded files
# """
# # print(f'present_volumes: {present_volumes}')
# # print(f'downloaded_volumes: {downloaded_volumes}')
# # content_folder=f'{config["download_location"]}{folder}'
# # content_files=[file for file in os.listdir(content_folder)]
# # print(f'content_files: {content_files}')
# # print(f'dupe_files: {dupe_files}')
# # get all files that are in both lists
# dupe_files = [file for file in downloaded_volumes if file in present_volumes]
# folder_files = [file for file in os.listdir(folder)]
# for file in folder_files:
# n = re.search(r"#\d{1,4}", file)
# if n:
# vol_num = int(n.group()[1:])
# if vol_num in dupe_files:
# os.remove(f"{folder}/{file}")
# Log.info(
# f"Deleted {file} from {folder} as it is already in the database"
# )
# dupe_files.remove(vol_num)
# return dupe_files
# present_volumes = Komga.get_volumes("095S763VH28SQ")
# downloaded_volumes = [i for i in range(1, 104)]
# print(remove_if_alr_in_db(None, present_volumes, downloaded_volumes, ""))
# from PySide6 import QtCore, QtGui, QtWidgets
# from gui import SeriesSelectDialog
# from threads import SeriesThread
# app = QtWidgets.QApplication([])
# title = "86"
# api = "mangadex"
# dialog = SeriesSelectDialog(title, api)
# dialog.show()
# app.exec()
# #launch seriesthread
from APIs import ComicVineAPI
cv = ComicVineAPI()
print(cv.issue("4000-951282"))

178
src/testcode.ipynb Normal file

File diff suppressed because one or more lines are too long

13
testing.py Normal file
View File

@@ -0,0 +1,13 @@
from nyaapy.nyaasi.nyaa import Nyaa
from nyaapy.torrent import Torrent
cat = Nyaa()
data = cat.search(
" The 100th Time's the Charm - She Was Executed 99 Times, So How Did She Unlock 'Super Love' Mode!",
category=3,
subcategory=1,
)
for i in data:
print(i.name)

7
tests/test_aria.py Normal file
View File

@@ -0,0 +1,7 @@
import os
from src.download import Download
def test_Download():
dl = Download({})
assert dl.check_aria2() == True