Add issue templates and CI workflows; remove obsolete files
- Created bug report and feature request templates for better issue tracking. - Added a build workflow for automated package management and release. - Removed outdated CodeQL analysis and Python publish workflows. - Updated project metadata in pyproject.toml and README.md. - Refactored torrent handling and site interaction modules.
This commit is contained in:
97
src/komsuite_nyaapy/modules/anime_site.py
Normal file
97
src/komsuite_nyaapy/modules/anime_site.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import requests
|
||||
from komsuite_nyaapy.modules import torrent
|
||||
from komsuite_nyaapy.modules.parser import parse_nyaa, parse_single, parse_nyaa_rss
|
||||
|
||||
|
||||
class AnimeTorrentSite:
|
||||
SITE = torrent.TorrentSite.NYAASI
|
||||
URL = SITE
|
||||
|
||||
@classmethod
|
||||
def last_uploads(self, number_of_results: int):
|
||||
r = requests.get(self.URL)
|
||||
|
||||
# If anything up with nyaa servers let the user know.
|
||||
r.raise_for_status()
|
||||
|
||||
json_data = parse_nyaa(
|
||||
request_text=r.text, limit=number_of_results, site=self.SITE
|
||||
)
|
||||
|
||||
return torrent.json_to_class(json_data)
|
||||
|
||||
@classmethod
|
||||
def search(
|
||||
self,
|
||||
keyword: str,
|
||||
category: int = 0,
|
||||
subcategory: int = 0,
|
||||
filters: int = 0,
|
||||
page: int = 0,
|
||||
sorting: str = "id",
|
||||
order: str = "desc",
|
||||
**kwargs,
|
||||
):
|
||||
base_url = self.URL
|
||||
|
||||
user = kwargs.get("user", None)
|
||||
|
||||
user_uri = f"user/{user}" if user else ""
|
||||
|
||||
if page > 0:
|
||||
search_uri = "{}/{}?f={}&c={}_{}&q={}&p={}&s={}&o={}".format(
|
||||
base_url,
|
||||
user_uri,
|
||||
filters,
|
||||
category,
|
||||
subcategory,
|
||||
keyword,
|
||||
page,
|
||||
sorting,
|
||||
order,
|
||||
)
|
||||
else:
|
||||
search_uri = "{}/{}?f={}&c={}_{}&q={}&s={}&o={}".format(
|
||||
base_url,
|
||||
user_uri,
|
||||
filters,
|
||||
category,
|
||||
subcategory,
|
||||
keyword,
|
||||
sorting,
|
||||
order,
|
||||
)
|
||||
|
||||
if not user:
|
||||
search_uri += "&page=rss"
|
||||
http_response = requests.get(search_uri)
|
||||
http_response.raise_for_status()
|
||||
|
||||
if user:
|
||||
json_data = parse_nyaa(
|
||||
request_text=http_response.content, limit=None, site=self.SITE
|
||||
)
|
||||
else:
|
||||
json_data = parse_nyaa_rss(
|
||||
request_text=http_response.content, limit=None, site=self.SITE
|
||||
)
|
||||
|
||||
# Convert JSON data to a class object
|
||||
return torrent.json_to_class(json_data)
|
||||
|
||||
@classmethod
|
||||
def get(self, view_id: int):
|
||||
r = requests.get(f"{self.URL}/view/{view_id}")
|
||||
r.raise_for_status()
|
||||
|
||||
json_data = parse_single(request_text=r.content, site=self.SITE)
|
||||
|
||||
return torrent.json_to_class(json_data)
|
||||
|
||||
@classmethod
|
||||
def get_from_user(self, username):
|
||||
r = requests.get(f"{self.URL}/user/{username}")
|
||||
r.raise_for_status()
|
||||
|
||||
json_data = parse_nyaa(request_text=r.content, limit=None, site=self.SITE)
|
||||
return torrent.json_to_class(json_data)
|
||||
23
src/komsuite_nyaapy/modules/magnet.py
Normal file
23
src/komsuite_nyaapy/modules/magnet.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import urllib
|
||||
from urllib.parse import urlencode
|
||||
|
||||
|
||||
def magnet_builder(info_hash, title):
|
||||
"""
|
||||
Generates a magnet link using the info_hash and title of a given file.
|
||||
"""
|
||||
known_trackers = [
|
||||
"http://nyaa.tracker.wf:7777/announce",
|
||||
"udp://open.stealth.si:80/announce",
|
||||
"udp://tracker.opentrackr.org:1337/announce",
|
||||
"udp://exodus.desync.com:6969/announce",
|
||||
"udp://tracker.torrent.eu.org:451/announce",
|
||||
]
|
||||
|
||||
magnet_link = f"magnet:?xt=urn:btih:{info_hash}&" + urlencode(
|
||||
{"dn": title}, quote_via=urllib.parse.quote
|
||||
)
|
||||
for tracker in known_trackers:
|
||||
magnet_link += f"&{urlencode({'tr': tracker})}"
|
||||
|
||||
return magnet_link
|
||||
247
src/komsuite_nyaapy/modules/parser.py
Normal file
247
src/komsuite_nyaapy/modules/parser.py
Normal file
@@ -0,0 +1,247 @@
|
||||
from lxml import etree
|
||||
from .magnet import magnet_builder
|
||||
from .torrent import TorrentSite
|
||||
|
||||
|
||||
def nyaa_categories(b):
|
||||
c = b.replace("?c=", "")
|
||||
cats = c.split("_")
|
||||
|
||||
cat = cats[0]
|
||||
sub_cat = cats[1]
|
||||
|
||||
categories = {
|
||||
"1": {
|
||||
"name": "Anime",
|
||||
"sub_cats": {
|
||||
"1": "Anime Music Video",
|
||||
"2": "English-translated",
|
||||
"3": "Non-English-translated",
|
||||
"4": "Raw",
|
||||
},
|
||||
},
|
||||
"2": {"name": "Audio", "sub_cats": {"1": "Lossless", "2": "Lossy"}},
|
||||
"3": {
|
||||
"name": "Literature",
|
||||
"sub_cats": {
|
||||
"1": "English-translated",
|
||||
"2": "Non-English-translated",
|
||||
"3": "Raw",
|
||||
},
|
||||
},
|
||||
"4": {
|
||||
"name": "Live Action",
|
||||
"sub_cats": {
|
||||
"1": "English-translated",
|
||||
"2": "Idol/Promotional Video",
|
||||
"3": "Non-English-translated",
|
||||
"4": "Raw",
|
||||
},
|
||||
},
|
||||
"5": {"name": "Pictures", "sub_cats": {"1": "Graphics", "2": "Photos"}},
|
||||
"6": {"name": "Software", "sub_cats": {"1": "Applications", "2": "Games"}},
|
||||
}
|
||||
|
||||
try:
|
||||
category_name = (
|
||||
f"{categories[cat]['name']} - {categories[cat]['sub_cats'][sub_cat]}"
|
||||
)
|
||||
except KeyError:
|
||||
print("Unable to get Nyaa category name")
|
||||
return
|
||||
|
||||
return category_name
|
||||
|
||||
|
||||
def parse_nyaa_rss(request_text, limit, site):
|
||||
"""
|
||||
Extracts torrent information from a given rss response.
|
||||
"""
|
||||
root = etree.fromstring(request_text)
|
||||
torrents = []
|
||||
|
||||
for item in root.xpath("channel/item")[:limit]:
|
||||
# Decide category.
|
||||
if site in [TorrentSite.NYAASI, TorrentSite.NYAALAND]:
|
||||
category = item.findtext("nyaa:categoryId", namespaces=item.nsmap)
|
||||
elif site in [TorrentSite.SUKEBEINYAASI, TorrentSite.SUKEBEINYAALAND]:
|
||||
category = item.findtext("nyaa:categoryId", namespaces=item.nsmap)
|
||||
else:
|
||||
raise ValueError("Unknown TorrentSite received!")
|
||||
|
||||
try:
|
||||
is_remake = item.findtext("nyaa:remake", namespaces=item.nsmap) == "Yes"
|
||||
is_trusted = item.findtext("nyaa:trusted", namespaces=item.nsmap) == "Yes"
|
||||
item_type = (
|
||||
"remake" if is_remake else "trusted" if is_trusted else "default"
|
||||
)
|
||||
|
||||
torrent = {
|
||||
"id": item.findtext("guid").split("/")[-1],
|
||||
"category": category,
|
||||
"url": item.findtext("guid"),
|
||||
"name": item.findtext("title"),
|
||||
"download_url": item.findtext("link"),
|
||||
"magnet": magnet_builder(
|
||||
item.findtext("nyaa:infoHash", namespaces=item.nsmap),
|
||||
item.findtext("title"),
|
||||
),
|
||||
"size": item.findtext("nyaa:size", namespaces=item.nsmap),
|
||||
"date": item.findtext("pubDate"),
|
||||
"seeders": item.findtext("nyaa:seeders", namespaces=item.nsmap),
|
||||
"leechers": item.findtext("nyaa:leechers", namespaces=item.nsmap),
|
||||
"completed_downloads": None,
|
||||
"type": item_type,
|
||||
}
|
||||
torrents.append(torrent)
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
return torrents
|
||||
|
||||
|
||||
def parse_nyaa(request_text, limit, site):
|
||||
parser = etree.HTMLParser()
|
||||
tree = etree.fromstring(request_text, parser)
|
||||
|
||||
# Put proper domain here.
|
||||
uri = site.value
|
||||
|
||||
torrents = []
|
||||
|
||||
# Going through table rows
|
||||
for tr in tree.xpath("//tbody//tr")[:limit]:
|
||||
block = []
|
||||
|
||||
for td in tr.xpath("./td"):
|
||||
for link in td.xpath("./a"):
|
||||
href = link.attrib.get("href").split("/")[-1]
|
||||
|
||||
# Only caring about non-comment pages.
|
||||
if href[-9:] != "#comments":
|
||||
block.append(href)
|
||||
|
||||
if link.text and link.text.strip():
|
||||
block.append(link.text.strip())
|
||||
|
||||
if td.text is not None and td.text.strip():
|
||||
block.append(td.text.strip())
|
||||
|
||||
# Add type of torrent based on tr class.
|
||||
if tr.attrib.get("class") is not None:
|
||||
if "danger" in tr.attrib.get("class"):
|
||||
block.append("remake")
|
||||
elif "success" in tr.attrib.get("class"):
|
||||
block.append("trusted")
|
||||
else:
|
||||
block.append("default")
|
||||
else:
|
||||
block.append("default")
|
||||
|
||||
# Decide category.
|
||||
if site in [TorrentSite.NYAASI, TorrentSite.NYAALAND]:
|
||||
category = nyaa_categories(block[0])
|
||||
elif site is TorrentSite.SUKEBEINYAASI:
|
||||
category = sukebei_categories(block[0])
|
||||
else:
|
||||
raise ValueError("Unknown TorrentSite received!")
|
||||
|
||||
# Create torrent object
|
||||
try:
|
||||
torrent = {
|
||||
"id": block[1],
|
||||
"category": category,
|
||||
"url": "{}/view/{}".format(uri, block[1]),
|
||||
"name": block[2],
|
||||
"download_url": "{}/download/{}".format(uri, block[3]),
|
||||
"magnet": block[4],
|
||||
"size": block[5],
|
||||
"date": block[6],
|
||||
"seeders": block[7],
|
||||
"leechers": block[8],
|
||||
"completed_downloads": block[9],
|
||||
"type": block[10],
|
||||
}
|
||||
torrents.append(torrent)
|
||||
except IndexError:
|
||||
pass
|
||||
return torrents
|
||||
|
||||
|
||||
def parse_single(request_text, site):
|
||||
parser = etree.HTMLParser()
|
||||
tree = etree.fromstring(request_text, parser)
|
||||
|
||||
# Put proper domain here.
|
||||
uri = site.value
|
||||
|
||||
torrent = {}
|
||||
data = []
|
||||
torrent_files = []
|
||||
|
||||
# Find basic uploader info & torrent stats
|
||||
for row in tree.xpath("//div[@class='row']"):
|
||||
for div_text in row.xpath("./div[@class='col-md-5']//text()"):
|
||||
d = div_text.strip()
|
||||
if d:
|
||||
data.append(d)
|
||||
|
||||
# Find files, we need only text of the li element(s).
|
||||
# Sorry about Pycodestyle aka PEP8 (E501) error
|
||||
for el in tree.xpath("//div[contains(@class, 'torrent-file-list')]//li/text()"):
|
||||
if el.rstrip():
|
||||
torrent_files.append(el)
|
||||
|
||||
torrent["title"] = tree.xpath("//h3[@class='panel-title']/text()")[0].strip()
|
||||
torrent["category"] = data[0]
|
||||
torrent["uploader"] = data[4]
|
||||
torrent["uploader_profile"] = "{}/user/{}".format(uri, data[4])
|
||||
torrent["website"] = data[6]
|
||||
torrent["size"] = data[8]
|
||||
torrent["date"] = data[3]
|
||||
torrent["seeders"] = data[5]
|
||||
torrent["leechers"] = data[7]
|
||||
torrent["completed"] = data[9]
|
||||
torrent["hash"] = data[10]
|
||||
torrent["files"] = torrent_files
|
||||
|
||||
torrent["description"] = ""
|
||||
for s in tree.xpath("//div[@id='torrent-description']"):
|
||||
torrent["description"] += s.text
|
||||
|
||||
return torrent
|
||||
|
||||
|
||||
def sukebei_categories(b):
|
||||
c = b.replace("?c=", "")
|
||||
cats = c.split("_")
|
||||
|
||||
cat = cats[0]
|
||||
subcat = cats[1]
|
||||
|
||||
categories = {
|
||||
"1": {
|
||||
"name": "Art",
|
||||
"subcats": {
|
||||
"1": "Anime",
|
||||
"2": "Doujinshi",
|
||||
"3": "Games",
|
||||
"4": "Manga",
|
||||
"5": "Pictures",
|
||||
},
|
||||
},
|
||||
"2": {
|
||||
"name": "Real Life",
|
||||
"subcats": {"1": "Photobooks & Pictures", "2": "Videos"},
|
||||
},
|
||||
}
|
||||
|
||||
try:
|
||||
category_name = (
|
||||
f"{categories[cat]['name']} - {categories[cat]['subcats'][subcat]}"
|
||||
)
|
||||
except KeyError:
|
||||
print("Unable to get Sukebei category name")
|
||||
return
|
||||
|
||||
return category_name
|
||||
155
src/komsuite_nyaapy/modules/torrent.py
Normal file
155
src/komsuite_nyaapy/modules/torrent.py
Normal file
@@ -0,0 +1,155 @@
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import field
|
||||
import os
|
||||
import bencodepy
|
||||
import regex
|
||||
from typing import Optional
|
||||
import loguru
|
||||
import sys
|
||||
|
||||
log = loguru.logger
|
||||
log.remove()
|
||||
log.add("cli.log", rotation="1 week", retention="1 month")
|
||||
log.add(sys.stdout, level="INFO")
|
||||
|
||||
|
||||
def json_to_class(data):
|
||||
# We check if the data passed is a list or not
|
||||
if isinstance(data, list):
|
||||
object_list = []
|
||||
for item in data:
|
||||
object_list.append(Torrent(**item))
|
||||
# Return a list of Torrent objects
|
||||
return object_list
|
||||
else:
|
||||
return [Torrent(**data)]
|
||||
|
||||
|
||||
# This deals with converting the dict to an object
|
||||
@dataclass
|
||||
class Torrent:
|
||||
id: int = 0
|
||||
category: Optional[str] = None
|
||||
category: Optional[str] = None
|
||||
url: Optional[str] = None
|
||||
name: Optional[str] = None
|
||||
download_url: Optional[str] = None
|
||||
magnet: Optional[str] = None
|
||||
size: Optional[str] = None
|
||||
seeders: int = 0
|
||||
leechers: int = 0
|
||||
completed_downloads: int = 0
|
||||
date: Optional[str] = None
|
||||
type: Optional[str] = None
|
||||
volumes: list[int] = field(default_factory=list)
|
||||
contents: list[str] = field(default_factory=list)
|
||||
filetypes: list[str] = field(default_factory=list)
|
||||
|
||||
def __post_init__(self):
|
||||
self.get_contents
|
||||
self.seeders = int(self.seeders)
|
||||
self.leechers = int(self.leechers) if self.leechers is not None else 0
|
||||
|
||||
@property
|
||||
def filesizes(self):
|
||||
data_count = self.size.split(" ")[0]
|
||||
data_value = self.size.split(" ")[1]
|
||||
mib = 1
|
||||
kib = 2
|
||||
gib = 3
|
||||
|
||||
return float(data_count) * (
|
||||
1024
|
||||
** (mib if data_value == "MiB" else kib if data_value == "KiB" else gib)
|
||||
)
|
||||
|
||||
@property
|
||||
def get_contents(self):
|
||||
os.system(f"wget {self.download_url}> /dev/null 2>&1")
|
||||
with open(f"{self.download_url.split('/')[-1]}", "rb") as f:
|
||||
data = bencodepy.decode(f.read())
|
||||
|
||||
info = data[b"info"]
|
||||
filetypes: list[str] = []
|
||||
|
||||
# For multi-file torrents
|
||||
if b"files" in info:
|
||||
files: list[str] = []
|
||||
for file in info[b"files"]: # type: ignore
|
||||
path_parts = [part.decode("utf-8") for part in file[b"path"]] # type: ignore
|
||||
files.append("/".join(path_parts)) # type: ignore
|
||||
filetypes.append(path_parts[-1].split(".")[-1]) # type: ignore
|
||||
self.filetypes = list(set(filetypes))
|
||||
self.contents = files
|
||||
# check each file for the presence of r"^(.*?)\s(vol\.\s\d{2})|(v\d{2,3})"
|
||||
volumes: list[int] = []
|
||||
for file in files:
|
||||
match = regex.findall(
|
||||
r"(vol\.?\s*\d{1,3}|v\d{1,3}(?:[-v]\d{1,3})?)",
|
||||
file,
|
||||
regex.IGNORECASE,
|
||||
)
|
||||
if match:
|
||||
match = (
|
||||
"".join(match[0])
|
||||
.strip()
|
||||
.replace("vol", "")
|
||||
.replace("v", "")
|
||||
.replace(".", "")
|
||||
)
|
||||
if "-" in match:
|
||||
match = match.split("-")
|
||||
# if difference between numbers is greater than 1, add missing numbers to volumes
|
||||
if int(match[1]) - int(match[0]) > 1:
|
||||
for i in range(int(match[0]) - 1, int(match[1]) + 1):
|
||||
volumes.append(i)
|
||||
else:
|
||||
volumes.append(int(match[0]))
|
||||
volumes.append(int(match[1]))
|
||||
else:
|
||||
if match.isdigit():
|
||||
volumes.append(int(match))
|
||||
|
||||
volumes = sorted(volumes)
|
||||
self.volumes = volumes
|
||||
|
||||
# For single-file torrents
|
||||
# elif b"name" in info:
|
||||
# return [info[b"name"].decode("utf-8")]
|
||||
else:
|
||||
self.volumes = [0]
|
||||
# log.debug("Filetypes: {}, Volumes: {}".format(self.filetypes, self.volumes)) #! enable for debug
|
||||
os.remove(f"{self.download_url.split('/')[-1]}")
|
||||
|
||||
|
||||
class TorrentSite(Enum):
|
||||
"""
|
||||
Contains torrent sites
|
||||
"""
|
||||
|
||||
NYAASI = "https://nyaa.si"
|
||||
SUKEBEINYAASI = "https://sukebei.nyaa.si"
|
||||
|
||||
NYAALAND = "https://nyaa.land"
|
||||
|
||||
@classmethod
|
||||
def list_sites(cls):
|
||||
return [site.value for site in cls]
|
||||
|
||||
@classmethod
|
||||
def get_site(cls, site):
|
||||
if isinstance(site, TorrentSite):
|
||||
return site.value
|
||||
for s in cls:
|
||||
if s.value == site:
|
||||
return s
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_site_url(cls, site):
|
||||
return cls.site.value
|
||||
|
||||
@classmethod
|
||||
def nyaa(cls):
|
||||
return (cls.NYAASI, cls.NYAASI.value)
|
||||
Reference in New Issue
Block a user