feat(nyaasi): add static classes instead of object-based ones
This commit is contained in:
@@ -1,27 +1,26 @@
|
|||||||
import requests
|
import requests
|
||||||
from nyaapy import utils
|
|
||||||
from nyaapy import torrent
|
from nyaapy import torrent
|
||||||
|
from nyaapy.parser import parse_nyaa, parse_single, parse_nyaa_rss
|
||||||
|
|
||||||
|
class AnimeTorrentSite:
|
||||||
|
SITE = torrent.TorrentSite.NYAASI
|
||||||
|
URL = "https://nyaa.si"
|
||||||
|
|
||||||
class Nyaa:
|
@classmethod
|
||||||
|
def last_uploads(self, number_of_results: int):
|
||||||
def __init__(self):
|
|
||||||
self.SITE = utils.TorrentSite.NYAASI
|
|
||||||
self.URL = "https://nyaa.si"
|
|
||||||
|
|
||||||
def last_uploads(self, number_of_results):
|
|
||||||
r = requests.get(self.URL)
|
r = requests.get(self.URL)
|
||||||
|
|
||||||
# If anything up with nyaa servers let the user know.
|
# If anything up with nyaa servers let the user know.
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
|
|
||||||
json_data = utils.parse_nyaa(
|
json_data = parse_nyaa(
|
||||||
request_text=r.text, limit=number_of_results + 1, site=self.SITE
|
request_text=r.text, limit=number_of_results, site=self.SITE
|
||||||
)
|
)
|
||||||
|
|
||||||
return torrent.json_to_class(json_data)
|
return torrent.json_to_class(json_data)
|
||||||
|
|
||||||
def search(self, keyword, **kwargs):
|
@classmethod
|
||||||
|
def search(self, keyword: str, **kwargs):
|
||||||
base_url = self.URL
|
base_url = self.URL
|
||||||
|
|
||||||
user = kwargs.get("user", None)
|
user = kwargs.get("user", None)
|
||||||
@@ -67,28 +66,30 @@ class Nyaa:
|
|||||||
http_response.raise_for_status()
|
http_response.raise_for_status()
|
||||||
|
|
||||||
if user:
|
if user:
|
||||||
json_data = utils.parse_nyaa(
|
json_data = parse_nyaa(
|
||||||
request_text=http_response.text, limit=None, site=self.SITE
|
request_text=http_response.content, limit=None, site=self.SITE
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
json_data = utils.parse_nyaa_rss(
|
json_data = parse_nyaa_rss(
|
||||||
request_text=http_response.text, limit=None, site=self.SITE
|
request_text=http_response.content, limit=None, site=self.SITE
|
||||||
)
|
)
|
||||||
|
|
||||||
# Convert JSON data to a class object
|
# Convert JSON data to a class object
|
||||||
return torrent.json_to_class(json_data)
|
return torrent.json_to_class(json_data)
|
||||||
|
|
||||||
def get(self, view_id):
|
@classmethod
|
||||||
|
def get(self, view_id: int):
|
||||||
r = requests.get(f"{self.URL}/view/{view_id}")
|
r = requests.get(f"{self.URL}/view/{view_id}")
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
|
|
||||||
json_data = utils.parse_single(request_text=r.text, site=self.SITE)
|
json_data = parse_single(request_text=r.content, site=self.SITE)
|
||||||
|
|
||||||
return torrent.json_to_class(json_data)
|
return torrent.json_to_class(json_data)
|
||||||
|
|
||||||
def get_user(self, username):
|
@classmethod
|
||||||
|
def get_from_user(self, username):
|
||||||
r = requests.get(f"{self.URL}/user/{username}")
|
r = requests.get(f"{self.URL}/user/{username}")
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
|
|
||||||
json_data = utils.parse_nyaa(request_text=r.text, limit=None, site=self.SITE)
|
json_data = parse_nyaa(request_text=r.content, limit=None, site=self.SITE)
|
||||||
return torrent.json_to_class(json_data)
|
return torrent.json_to_class(json_data)
|
||||||
22
nyaapy/magnet.py
Normal file
22
nyaapy/magnet.py
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
import urllib
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
|
def magnet_builder(info_hash, title):
|
||||||
|
"""
|
||||||
|
Generates a magnet link using the info_hash and title of a given file.
|
||||||
|
"""
|
||||||
|
known_trackers = [
|
||||||
|
"http://nyaa.tracker.wf:7777/announce",
|
||||||
|
"udp://open.stealth.si:80/announce",
|
||||||
|
"udp://tracker.opentrackr.org:1337/announce",
|
||||||
|
"udp://exodus.desync.com:6969/announce",
|
||||||
|
"udp://tracker.torrent.eu.org:451/announce",
|
||||||
|
]
|
||||||
|
|
||||||
|
magnet_link = f"magnet:?xt=urn:btih:{info_hash}&" + urlencode(
|
||||||
|
{"dn": title}, quote_via=urllib.parse.quote
|
||||||
|
)
|
||||||
|
for tracker in known_trackers:
|
||||||
|
magnet_link += f"&{urlencode({'tr': tracker})}"
|
||||||
|
|
||||||
|
return magnet_link
|
||||||
0
nyaapy/nyaasi/__init__.py
Normal file
0
nyaapy/nyaasi/__init__.py
Normal file
7
nyaapy/nyaasi/nyaa.py
Normal file
7
nyaapy/nyaasi/nyaa.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
from nyaapy.anime_site import AnimeTorrentSite
|
||||||
|
from nyaapy.torrent import TorrentSite
|
||||||
|
|
||||||
|
|
||||||
|
class Nyaa(AnimeTorrentSite):
|
||||||
|
SITE = TorrentSite.NYAASI
|
||||||
|
URL = "https://nyaa.si"
|
||||||
6
nyaapy/nyaasi/sukebei.py
Normal file
6
nyaapy/nyaasi/sukebei.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
from nyaapy.anime_site import AnimeTorrentSite
|
||||||
|
from nyaapy.torrent import TorrentSite
|
||||||
|
|
||||||
|
class SukebeiNyaa(AnimeTorrentSite):
|
||||||
|
SITE = TorrentSite.SUKEBEINYAASI
|
||||||
|
URL = "https://sukebei.nyaa.si"
|
||||||
@@ -1,323 +1,247 @@
|
|||||||
import urllib
|
from lxml import etree
|
||||||
from enum import Enum
|
from nyaapy.magnet import magnet_builder
|
||||||
from urllib.parse import urlencode
|
from nyaapy.torrent import TorrentSite
|
||||||
|
|
||||||
from lxml import etree
|
def nyaa_categories(b):
|
||||||
|
c = b.replace("?c=", "")
|
||||||
|
cats = c.split("_")
|
||||||
class TorrentSite(Enum):
|
|
||||||
"""
|
cat = cats[0]
|
||||||
Contains torrent sites
|
sub_cat = cats[1]
|
||||||
"""
|
|
||||||
|
categories = {
|
||||||
NYAASI = "https://nyaa.si"
|
"1": {
|
||||||
SUKEBEINYAASI = "https://sukebei.nyaa.si"
|
"name": "Anime",
|
||||||
|
"sub_cats": {
|
||||||
# * nyaa.pantsu.cat redirects to nyaa.net
|
"1": "Anime Music Video",
|
||||||
NYAANET = "https://nyaa.net"
|
"2": "English-translated",
|
||||||
SUKEBEINYAANET = "https://sukebei.nyaa.net"
|
"3": "Non-English-translated",
|
||||||
|
"4": "Raw",
|
||||||
|
},
|
||||||
def nyaa_categories(b):
|
},
|
||||||
c = b.replace("?c=", "")
|
"2": {"name": "Audio", "sub_cats": {"1": "Lossless", "2": "Lossy"}},
|
||||||
cats = c.split("_")
|
"3": {
|
||||||
|
"name": "Literature",
|
||||||
cat = cats[0]
|
"sub_cats": {
|
||||||
sub_cat = cats[1]
|
"1": "English-translated",
|
||||||
|
"2": "Non-English-translated",
|
||||||
categories = {
|
"3": "Raw",
|
||||||
"1": {
|
},
|
||||||
"name": "Anime",
|
},
|
||||||
"sub_cats": {
|
"4": {
|
||||||
"1": "Anime Music Video",
|
"name": "Live Action",
|
||||||
"2": "English-translated",
|
"sub_cats": {
|
||||||
"3": "Non-English-translated",
|
"1": "English-translated",
|
||||||
"4": "Raw",
|
"2": "Idol/Promotional Video",
|
||||||
},
|
"3": "Non-English-translated",
|
||||||
},
|
"4": "Raw",
|
||||||
"2": {"name": "Audio", "sub_cats": {"1": "Lossless", "2": "Lossy"}},
|
},
|
||||||
"3": {
|
},
|
||||||
"name": "Literature",
|
"5": {"name": "Pictures", "sub_cats": {"1": "Graphics", "2": "Photos"}},
|
||||||
"sub_cats": {
|
"6": {"name": "Software", "sub_cats": {"1": "Applications", "2": "Games"}},
|
||||||
"1": "English-translated",
|
}
|
||||||
"2": "Non-English-translated",
|
|
||||||
"3": "Raw",
|
try:
|
||||||
},
|
category_name = (
|
||||||
},
|
f"{categories[cat]['name']} - {categories[cat]['sub_cats'][sub_cat]}"
|
||||||
"4": {
|
)
|
||||||
"name": "Live Action",
|
except KeyError:
|
||||||
"sub_cats": {
|
print("Unable to get Nyaa category name")
|
||||||
"1": "English-translated",
|
return
|
||||||
"2": "Idol/Promotional Video",
|
|
||||||
"3": "Non-English-translated",
|
return category_name
|
||||||
"4": "Raw",
|
|
||||||
},
|
|
||||||
},
|
def parse_nyaa_rss(request_text, limit, site):
|
||||||
"5": {"name": "Pictures", "sub_cats": {"1": "Graphics", "2": "Photos"}},
|
"""
|
||||||
"6": {"name": "Software", "sub_cats": {"1": "Applications", "2": "Games"}},
|
Extracts torrent information from a given rss response.
|
||||||
}
|
"""
|
||||||
|
root = etree.fromstring(request_text)
|
||||||
try:
|
torrents = []
|
||||||
category_name = (
|
|
||||||
f"{categories[cat]['name']} - {categories[cat]['sub_cats'][sub_cat]}"
|
for item in root.xpath("channel/item")[:limit]:
|
||||||
)
|
# Decide category.
|
||||||
except KeyError:
|
if site in [TorrentSite.NYAASI, TorrentSite.NYAALAND]:
|
||||||
print("Unable to get Nyaa category name")
|
category = item.findtext("nyaa:categoryId", namespaces=item.nsmap)
|
||||||
return
|
elif site in [TorrentSite.SUKEBEINYAASI, TorrentSite.SUKEBEINYAALAND]:
|
||||||
|
category = item.findtext("nyaa:categoryId", namespaces=item.nsmap)
|
||||||
return category_name
|
else:
|
||||||
|
raise ValueError("Unknown TorrentSite received!")
|
||||||
|
|
||||||
def parse_nyaa_rss(request_text, limit, site):
|
try:
|
||||||
"""
|
is_remake = item.findtext("nyaa:remake", namespaces=item.nsmap) == "Yes"
|
||||||
Extracts torrent information from a given rss response.
|
is_trusted = item.findtext("nyaa:trusted", namespaces=item.nsmap) == "Yes"
|
||||||
"""
|
item_type = (
|
||||||
root = etree.fromstring(request_text)
|
"remake" if is_remake else "trusted" if is_trusted else "default"
|
||||||
torrents = []
|
)
|
||||||
|
|
||||||
for item in root.xpath("channel/item")[:limit]:
|
torrent = {
|
||||||
# Decide category.
|
"id": item.findtext("guid").split("/")[-1],
|
||||||
if site in [TorrentSite.NYAASI, TorrentSite.NYAANET]:
|
"category": category,
|
||||||
category = item.findtext("nyaa:categoryId", namespaces=item.nsmap)
|
"url": item.findtext("guid"),
|
||||||
elif site in [TorrentSite.SUKEBEINYAASI, TorrentSite.SUKEBEINYAANET]:
|
"name": item.findtext("title"),
|
||||||
category = item.findtext("nyaa:categoryId", namespaces=item.nsmap)
|
"download_url": item.findtext("link"),
|
||||||
else:
|
"magnet": magnet_builder(
|
||||||
raise ValueError("Unknown TorrentSite received!")
|
item.findtext("nyaa:infoHash", namespaces=item.nsmap),
|
||||||
|
item.findtext("title"),
|
||||||
try:
|
),
|
||||||
is_remake = item.findtext("nyaa:remake", namespaces=item.nsmap) == "Yes"
|
"size": item.findtext("nyaa:size", namespaces=item.nsmap),
|
||||||
is_trusted = item.findtext("nyaa:trusted", namespaces=item.nsmap) == "Yes"
|
"date": item.findtext("pubDate"),
|
||||||
item_type = (
|
"seeders": item.findtext("nyaa:seeders", namespaces=item.nsmap),
|
||||||
"remake" if is_remake else "trusted" if is_trusted else "default"
|
"leechers": item.findtext("nyaa:leechers", namespaces=item.nsmap),
|
||||||
)
|
"completed_downloads": None,
|
||||||
|
"type": item_type,
|
||||||
torrent = {
|
}
|
||||||
"id": item.findtext("guid").split("/")[-1],
|
torrents.append(torrent)
|
||||||
"category": category,
|
except IndexError:
|
||||||
"url": item.findtext("guid"),
|
pass
|
||||||
"name": item.findtext("title"),
|
|
||||||
"download_url": item.findtext("link"),
|
return torrents
|
||||||
"magnet": magnet_builder(
|
|
||||||
item.findtext("nyaa:infoHash", namespaces=item.nsmap),
|
|
||||||
item.findtext("title"),
|
def parse_nyaa(request_text, limit, site):
|
||||||
),
|
parser = etree.HTMLParser()
|
||||||
"size": item.findtext("nyaa:size", namespaces=item.nsmap),
|
tree = etree.fromstring(request_text, parser)
|
||||||
"date": item.findtext("pubDate"),
|
|
||||||
"seeders": item.findtext("nyaa:seeders", namespaces=item.nsmap),
|
# Put proper domain here.
|
||||||
"leechers": item.findtext("nyaa:leechers", namespaces=item.nsmap),
|
uri = site.value
|
||||||
"completed_downloads": None,
|
|
||||||
"type": item_type,
|
torrents = []
|
||||||
}
|
|
||||||
torrents.append(torrent)
|
# Going through table rows
|
||||||
except IndexError:
|
for tr in tree.xpath("//tbody//tr")[:limit]:
|
||||||
pass
|
block = []
|
||||||
|
|
||||||
return torrents
|
for td in tr.xpath("./td"):
|
||||||
|
for link in td.xpath("./a"):
|
||||||
|
|
||||||
def parse_nyaa(request_text, limit, site):
|
href = link.attrib.get("href").split("/")[-1]
|
||||||
parser = etree.HTMLParser()
|
|
||||||
tree = etree.fromstring(request_text, parser)
|
# Only caring about non-comment pages.
|
||||||
|
if href[-9:] != "#comments":
|
||||||
# Put proper domain here.
|
block.append(href)
|
||||||
uri = site.value
|
|
||||||
|
if link.text and link.text.strip():
|
||||||
torrents = []
|
block.append(link.text.strip())
|
||||||
|
|
||||||
# Going through table rows
|
if td.text is not None and td.text.strip():
|
||||||
for tr in tree.xpath("//tbody//tr")[:limit]:
|
block.append(td.text.strip())
|
||||||
block = []
|
|
||||||
|
# Add type of torrent based on tr class.
|
||||||
for td in tr.xpath("./td"):
|
if tr.attrib.get("class") is not None:
|
||||||
for link in td.xpath("./a"):
|
if "danger" in tr.attrib.get("class"):
|
||||||
|
block.append("remake")
|
||||||
href = link.attrib.get("href").split("/")[-1]
|
elif "success" in tr.attrib.get("class"):
|
||||||
|
block.append("trusted")
|
||||||
# Only caring about non-comment pages.
|
else:
|
||||||
if href[-9:] != "#comments":
|
block.append("default")
|
||||||
block.append(href)
|
else:
|
||||||
|
block.append("default")
|
||||||
if link.text and link.text.strip():
|
|
||||||
block.append(link.text.strip())
|
# Decide category.
|
||||||
|
if site in [TorrentSite.NYAASI, TorrentSite.NYAALAND]:
|
||||||
if td.text is not None and td.text.strip():
|
category = nyaa_categories(block[0])
|
||||||
block.append(td.text.strip())
|
elif site is TorrentSite.SUKEBEINYAASI:
|
||||||
|
category = sukebei_categories(block[0])
|
||||||
# Add type of torrent based on tr class.
|
else:
|
||||||
if tr.attrib.get("class") is not None:
|
raise ValueError("Unknown TorrentSite received!")
|
||||||
if "danger" in tr.attrib.get("class"):
|
|
||||||
block.append("remake")
|
# Create torrent object
|
||||||
elif "success" in tr.attrib.get("class"):
|
try:
|
||||||
block.append("trusted")
|
torrent = {
|
||||||
else:
|
"id": block[1],
|
||||||
block.append("default")
|
"category": category,
|
||||||
else:
|
"url": "{}/view/{}".format(uri, block[1]),
|
||||||
block.append("default")
|
"name": block[2],
|
||||||
|
"download_url": "{}/download/{}".format(uri, block[3]),
|
||||||
# Decide category.
|
"magnet": block[4],
|
||||||
if site in [TorrentSite.NYAASI, TorrentSite.NYAANET]:
|
"size": block[5],
|
||||||
category = nyaa_categories(block[0])
|
"date": block[6],
|
||||||
elif site in [TorrentSite.SUKEBEINYAASI, TorrentSite.SUKEBEINYAANET]:
|
"seeders": block[7],
|
||||||
category = sukebei_categories(block[0])
|
"leechers": block[8],
|
||||||
else:
|
"completed_downloads": block[9],
|
||||||
raise ValueError("Unknown TorrentSite received!")
|
"type": block[10],
|
||||||
|
}
|
||||||
# Create torrent object
|
torrents.append(torrent)
|
||||||
try:
|
except IndexError:
|
||||||
torrent = {
|
pass
|
||||||
"id": block[1],
|
return torrents
|
||||||
"category": category,
|
|
||||||
"url": "{}/view/{}".format(uri, block[1]),
|
|
||||||
"name": block[2],
|
def parse_single(request_text, site):
|
||||||
"download_url": "{}/download/{}".format(uri, block[3]),
|
parser = etree.HTMLParser()
|
||||||
"magnet": block[4],
|
tree = etree.fromstring(request_text, parser)
|
||||||
"size": block[5],
|
|
||||||
"date": block[6],
|
# Put proper domain here.
|
||||||
"seeders": block[7],
|
uri = site.value
|
||||||
"leechers": block[8],
|
|
||||||
"completed_downloads": block[9],
|
torrent = {}
|
||||||
"type": block[10],
|
data = []
|
||||||
}
|
torrent_files = []
|
||||||
torrents.append(torrent)
|
|
||||||
except IndexError:
|
# Find basic uploader info & torrent stats
|
||||||
pass
|
for row in tree.xpath("//div[@class='row']"):
|
||||||
return torrents
|
for div_text in row.xpath("./div[@class='col-md-5']//text()"):
|
||||||
|
d = div_text.strip()
|
||||||
|
if d:
|
||||||
def parse_single(request_text, site):
|
data.append(d)
|
||||||
parser = etree.HTMLParser()
|
|
||||||
tree = etree.fromstring(request_text, parser)
|
# Find files, we need only text of the li element(s).
|
||||||
|
# Sorry about Pycodestyle aka PEP8 (E501) error
|
||||||
# Put proper domain here.
|
for el in tree.xpath("//div[contains(@class, 'torrent-file-list')]//li/text()"):
|
||||||
uri = site.value
|
if el.rstrip():
|
||||||
|
torrent_files.append(el)
|
||||||
torrent = {}
|
|
||||||
data = []
|
torrent["title"] = tree.xpath("//h3[@class='panel-title']/text()")[0].strip()
|
||||||
torrent_files = []
|
torrent["category"] = data[0]
|
||||||
|
torrent["uploader"] = data[4]
|
||||||
# Find basic uploader info & torrent stats
|
torrent["uploader_profile"] = "{}/user/{}".format(uri, data[4])
|
||||||
for row in tree.xpath("//div[@class='row']"):
|
torrent["website"] = data[6]
|
||||||
for div_text in row.xpath("./div[@class='col-md-5']//text()"):
|
torrent["size"] = data[8]
|
||||||
d = div_text.strip()
|
torrent["date"] = data[3]
|
||||||
if d:
|
torrent["seeders"] = data[5]
|
||||||
data.append(d)
|
torrent["leechers"] = data[7]
|
||||||
|
torrent["completed"] = data[9]
|
||||||
# Find files, we need only text of the li element(s).
|
torrent["hash"] = data[10]
|
||||||
# Sorry about Pycodestyle aka PEP8 (E501) error
|
torrent["files"] = torrent_files
|
||||||
for el in tree.xpath("//div[contains(@class, 'torrent-file-list')]//li/text()"):
|
|
||||||
if el.rstrip():
|
torrent["description"] = ""
|
||||||
torrent_files.append(el)
|
for s in tree.xpath("//div[@id='torrent-description']"):
|
||||||
|
torrent["description"] += s.text
|
||||||
torrent["title"] = tree.xpath("//h3[@class='panel-title']/text()")[0].strip()
|
|
||||||
torrent["category"] = data[0]
|
return torrent
|
||||||
torrent["uploader"] = data[4]
|
|
||||||
torrent["uploader_profile"] = "{}/user/{}".format(uri, data[4])
|
|
||||||
torrent["website"] = data[6]
|
def sukebei_categories(b):
|
||||||
torrent["size"] = data[8]
|
c = b.replace("?c=", "")
|
||||||
torrent["date"] = data[3]
|
cats = c.split("_")
|
||||||
torrent["seeders"] = data[5]
|
|
||||||
torrent["leechers"] = data[7]
|
cat = cats[0]
|
||||||
torrent["completed"] = data[9]
|
subcat = cats[1]
|
||||||
torrent["hash"] = data[10]
|
|
||||||
torrent["files"] = torrent_files
|
categories = {
|
||||||
|
"1": {
|
||||||
torrent["description"] = ""
|
"name": "Art",
|
||||||
for s in tree.xpath("//div[@id='torrent-description']"):
|
"subcats": {
|
||||||
torrent["description"] += s.text
|
"1": "Anime",
|
||||||
|
"2": "Doujinshi",
|
||||||
return torrent
|
"3": "Games",
|
||||||
|
"4": "Manga",
|
||||||
|
"5": "Pictures",
|
||||||
def sukebei_categories(b):
|
},
|
||||||
c = b.replace("?c=", "")
|
},
|
||||||
cats = c.split("_")
|
"2": {
|
||||||
|
"name": "Real Life",
|
||||||
cat = cats[0]
|
"subcats": {"1": "Photobooks & Pictures", "2": "Videos"},
|
||||||
subcat = cats[1]
|
},
|
||||||
|
}
|
||||||
categories = {
|
|
||||||
"1": {
|
try:
|
||||||
"name": "Art",
|
category_name = (
|
||||||
"subcats": {
|
f"{categories[cat]['name']} - {categories[cat]['subcats'][subcat]}"
|
||||||
"1": "Anime",
|
)
|
||||||
"2": "Doujinshi",
|
except KeyError:
|
||||||
"3": "Games",
|
print("Unable to get Sukebei category name")
|
||||||
"4": "Manga",
|
return
|
||||||
"5": "Pictures",
|
|
||||||
},
|
return category_name
|
||||||
},
|
|
||||||
"2": {
|
|
||||||
"name": "Real Life",
|
|
||||||
"subcats": {"1": "Photobooks & Pictures", "2": "Videos"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
category_name = (
|
|
||||||
f"{categories[cat]['name']} - {categories[cat]['subcats'][subcat]}"
|
|
||||||
)
|
|
||||||
except KeyError:
|
|
||||||
print("Unable to get Sukebei category name")
|
|
||||||
return
|
|
||||||
|
|
||||||
return category_name
|
|
||||||
|
|
||||||
|
|
||||||
def magnet_builder(info_hash, title):
|
|
||||||
"""
|
|
||||||
Generates a magnet link using the info_hash and title of a given file.
|
|
||||||
"""
|
|
||||||
known_trackers = [
|
|
||||||
"http://nyaa.tracker.wf:7777/announce",
|
|
||||||
"udp://open.stealth.si:80/announce",
|
|
||||||
"udp://tracker.opentrackr.org:1337/announce",
|
|
||||||
"udp://exodus.desync.com:6969/announce",
|
|
||||||
"udp://tracker.torrent.eu.org:451/announce",
|
|
||||||
]
|
|
||||||
|
|
||||||
magnet_link = f"magnet:?xt=urn:btih:{info_hash}&" + urlencode(
|
|
||||||
{"dn": title}, quote_via=urllib.parse.quote
|
|
||||||
)
|
|
||||||
for tracker in known_trackers:
|
|
||||||
magnet_link += f"&{urlencode({'tr': tracker})}"
|
|
||||||
|
|
||||||
return magnet_link
|
|
||||||
|
|
||||||
|
|
||||||
# Pantsu Utils
|
|
||||||
def query_builder(q, params):
|
|
||||||
available_params = [
|
|
||||||
"category",
|
|
||||||
"page",
|
|
||||||
"limit",
|
|
||||||
"userID",
|
|
||||||
"fromID",
|
|
||||||
"status",
|
|
||||||
"maxage",
|
|
||||||
"toDate",
|
|
||||||
"fromDate",
|
|
||||||
"dateType",
|
|
||||||
"minSize",
|
|
||||||
"maxSize",
|
|
||||||
"sizeType",
|
|
||||||
"sort",
|
|
||||||
"order",
|
|
||||||
"lang",
|
|
||||||
]
|
|
||||||
query = "?q={}".format(q.replace(" ", "+"))
|
|
||||||
|
|
||||||
for param, value in params.items():
|
|
||||||
if param in available_params:
|
|
||||||
if param != "category" and param != "status" and param != "lang":
|
|
||||||
query += "&{}={}".format(param, value)
|
|
||||||
elif param == "category":
|
|
||||||
query += "&c={}_{}".format(value[0], value[1])
|
|
||||||
|
|
||||||
elif param == "status":
|
|
||||||
query += "&s={}".format(value)
|
|
||||||
|
|
||||||
elif param == "lang":
|
|
||||||
for lang in value:
|
|
||||||
query += "&lang={}".format(lang)
|
|
||||||
|
|
||||||
return query
|
|
||||||
@@ -1,49 +0,0 @@
|
|||||||
import requests
|
|
||||||
from nyaapy import utils
|
|
||||||
|
|
||||||
|
|
||||||
class SukebeiNyaa:
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.SITE = utils.TorrentSite.SUKEBEINYAASI
|
|
||||||
|
|
||||||
def search(self, keyword, **kwargs):
|
|
||||||
uri = self.SITE.value
|
|
||||||
category = kwargs.get("category", 0)
|
|
||||||
subcategory = kwargs.get("subcategory", 0)
|
|
||||||
filters = kwargs.get("filters", 0)
|
|
||||||
page = kwargs.get("page", 0)
|
|
||||||
|
|
||||||
if page > 0:
|
|
||||||
r = requests.get(
|
|
||||||
"{}/?f={}&c={}_{}&q={}&p={}".format(
|
|
||||||
uri, filters, category, subcategory, keyword, page
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
r = requests.get(
|
|
||||||
"{}/?f={}&c={}_{}&q={}".format(
|
|
||||||
uri, filters, category, subcategory, keyword
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
r.raise_for_status()
|
|
||||||
return utils.parse_nyaa(r.text, limit=None, site=self.SITE)
|
|
||||||
|
|
||||||
def get(self, id):
|
|
||||||
r = requests.get("{}/view/{}".format(self.SITE.value, id))
|
|
||||||
r.raise_for_status()
|
|
||||||
|
|
||||||
return utils.parse_single(r.text, self.SITE)
|
|
||||||
|
|
||||||
def get_user(self, username):
|
|
||||||
r = requests.get("{}/user/{}".format(self.SITE.value, username))
|
|
||||||
r.raise_for_status()
|
|
||||||
|
|
||||||
return utils.parse_nyaa(r.text, limit=None, site=self.SITE)
|
|
||||||
|
|
||||||
def last_uploads(self, number_of_results):
|
|
||||||
r = requests.get(self.SITE.value)
|
|
||||||
r.raise_for_status()
|
|
||||||
|
|
||||||
return utils.parse_nyaa(r.text, limit=number_of_results + 1, site=self.SITE)
|
|
||||||
@@ -1,3 +1,6 @@
|
|||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
|
||||||
def json_to_class(data):
|
def json_to_class(data):
|
||||||
# We check if the data passed is a list or not
|
# We check if the data passed is a list or not
|
||||||
if isinstance(data, list):
|
if isinstance(data, list):
|
||||||
@@ -9,9 +12,18 @@ def json_to_class(data):
|
|||||||
else:
|
else:
|
||||||
return Torrent(data)
|
return Torrent(data)
|
||||||
|
|
||||||
|
|
||||||
# This deals with converting the dict to an object
|
# This deals with converting the dict to an object
|
||||||
class Torrent(object):
|
class Torrent(object):
|
||||||
def __init__(self, my_dict):
|
def __init__(self, my_dict):
|
||||||
for key in my_dict:
|
for key in my_dict:
|
||||||
setattr(self, key, my_dict[key])
|
setattr(self, key, my_dict[key])
|
||||||
|
|
||||||
|
class TorrentSite(Enum):
|
||||||
|
"""
|
||||||
|
Contains torrent sites
|
||||||
|
"""
|
||||||
|
|
||||||
|
NYAASI = "https://nyaa.si"
|
||||||
|
SUKEBEINYAASI = "https://sukebei.nyaa.si"
|
||||||
|
|
||||||
|
NYAALAND = "https://nyaa.land"
|
||||||
2
poetry.lock
generated
2
poetry.lock
generated
@@ -599,4 +599,4 @@ zstd = ["zstandard (>=0.18.0)"]
|
|||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "2.0"
|
lock-version = "2.0"
|
||||||
python-versions = "^3.10"
|
python-versions = "^3.10"
|
||||||
content-hash = "bd09d4b9f6f3ae48c750ce8dbd1bacc8b75e265b4363f77bf095962f9d1ebeac"
|
content-hash = "cb48b1a114a5cd1aa44d635f91c49ce640137746939a4feadb5615f8cfc7cf8b"
|
||||||
|
|||||||
30
tests/integration/test_nyaasi.py
Normal file
30
tests/integration/test_nyaasi.py
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
from nyaapy.nyaasi.nyaa import Nyaa
|
||||||
|
from nyaapy.torrent import Torrent
|
||||||
|
|
||||||
|
def test_nyaa_last_uploads():
|
||||||
|
request = Nyaa.last_uploads(number_of_results=10)
|
||||||
|
torrent = request[0]
|
||||||
|
|
||||||
|
assert isinstance(torrent, Torrent) == True
|
||||||
|
assert len(request) == 10
|
||||||
|
|
||||||
|
|
||||||
|
def test_nyaa_search():
|
||||||
|
request = Nyaa.search(keyword="koe no katachi")
|
||||||
|
torrent = request[0]
|
||||||
|
|
||||||
|
assert isinstance(torrent, Torrent) == True
|
||||||
|
|
||||||
|
|
||||||
|
def test_nyaa_get_single():
|
||||||
|
request = Nyaa.get(view_id='1847113')
|
||||||
|
|
||||||
|
assert isinstance(request, Torrent) == True
|
||||||
|
|
||||||
|
|
||||||
|
def test_nyaa_get_from_user():
|
||||||
|
request = Nyaa.get_from_user(username="Erai-raws")
|
||||||
|
torrent = request[0]
|
||||||
|
|
||||||
|
assert isinstance(torrent, Torrent) == True
|
||||||
|
assert len(request) <= 75
|
||||||
@@ -1,74 +0,0 @@
|
|||||||
from nyaapy.nyaa import Nyaa
|
|
||||||
from pprint import pprint
|
|
||||||
from datetime import datetime
|
|
||||||
import json
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Creating a folder for test_files
|
|
||||||
# ! not included in github project.
|
|
||||||
if not os.path.isdir("test_files"):
|
|
||||||
os.makedirs("test_files")
|
|
||||||
|
|
||||||
nyaa = Nyaa()
|
|
||||||
|
|
||||||
# Get fresh torrents
|
|
||||||
dt_latest_torrents_begin = datetime.now()
|
|
||||||
latest_torrents = nyaa.last_uploads(100)
|
|
||||||
dt_latest_torrents_end = datetime.now()
|
|
||||||
with open("test_files/nyaa_latest_torrent_test.json", "w") as f:
|
|
||||||
for torrent in latest_torrents:
|
|
||||||
try:
|
|
||||||
# This prints it as byte like objects since unicode is fun
|
|
||||||
f.write(str(torrent.name.encode("utf-8")) + "\n")
|
|
||||||
except AttributeError:
|
|
||||||
f.write("No name found for this torrent")
|
|
||||||
|
|
||||||
# Search some nasty stuff
|
|
||||||
dt_search_begin = datetime.now()
|
|
||||||
test_search = nyaa.search("kimi no na wa")
|
|
||||||
dt_search_end = datetime.now()
|
|
||||||
with open("test_files/nyaa_search_test.json", "w") as f:
|
|
||||||
for torrent in test_search:
|
|
||||||
try:
|
|
||||||
# This prints it as byte like objects since unicode is fun
|
|
||||||
f.write(str(torrent.name.encode("utf-8")) + "\n")
|
|
||||||
except AttributeError:
|
|
||||||
f.write("No name found for this torrent")
|
|
||||||
|
|
||||||
# Get first torrent from found torrents
|
|
||||||
dt_single_torrent_begin = datetime.now()
|
|
||||||
single_torrent = test_search[0]
|
|
||||||
dt_single_torrent_end = datetime.now()
|
|
||||||
with open("test_files/nyaa_single_torrent_test.json", "w") as f:
|
|
||||||
try:
|
|
||||||
# This prints it as byte like objects since unicode is fun
|
|
||||||
f.write(str(torrent.name.encode("utf-8")) + "\n")
|
|
||||||
except AttributeError:
|
|
||||||
f.write("No name found for this torrent")
|
|
||||||
|
|
||||||
dt_user_begin = datetime.now()
|
|
||||||
user_torrents = nyaa.get_user("HorribleSubs")
|
|
||||||
dt_user_end = datetime.now()
|
|
||||||
with open("test_files/nyaa_single_user_test.json", "w") as f:
|
|
||||||
for torrent in user_torrents:
|
|
||||||
try:
|
|
||||||
# This prints it as byte like objects since unicode is fun
|
|
||||||
f.write(str(torrent.name.encode("utf-8")) + "\n")
|
|
||||||
except AttributeError:
|
|
||||||
f.write("No name found for this torrent")
|
|
||||||
|
|
||||||
print(
|
|
||||||
"Latest torrents time:",
|
|
||||||
(dt_latest_torrents_end - dt_latest_torrents_begin).microseconds / 1000,
|
|
||||||
"msec",
|
|
||||||
)
|
|
||||||
print(
|
|
||||||
"Test search time:", (dt_search_end - dt_search_begin).microseconds / 1000, "msec"
|
|
||||||
)
|
|
||||||
print(
|
|
||||||
"Single torrent time:",
|
|
||||||
(dt_single_torrent_end - dt_single_torrent_begin).microseconds / 1000,
|
|
||||||
"msec",
|
|
||||||
)
|
|
||||||
print("Single user time:", (dt_user_end - dt_user_begin).microseconds / 1000, "msec")
|
|
||||||
@@ -1,7 +0,0 @@
|
|||||||
"""
|
|
||||||
* Pantsu need some serious work
|
|
||||||
Regular data single_torrent parser not working from other Nyaa alternatives
|
|
||||||
Needs some work
|
|
||||||
"""
|
|
||||||
|
|
||||||
print("TODO")
|
|
||||||
@@ -1,53 +0,0 @@
|
|||||||
from nyaapy.sukebei import SukebeiNyaa
|
|
||||||
from datetime import datetime
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Creating a folder for test_files
|
|
||||||
# ! not included in github project.
|
|
||||||
if not os.path.isdir("test_files"):
|
|
||||||
os.makedirs("test_files")
|
|
||||||
|
|
||||||
nyaa = SukebeiNyaa()
|
|
||||||
|
|
||||||
# Get fresh torrents
|
|
||||||
dt_latest_torrents_begin = datetime.now()
|
|
||||||
latest_torrents = nyaa.last_uploads(100)
|
|
||||||
dt_latest_torrents_end = datetime.now()
|
|
||||||
with open("test_files/sukebei_latest_torrent_test.json", "w") as f:
|
|
||||||
json.dump(latest_torrents, f)
|
|
||||||
|
|
||||||
# Search some nasty stuff
|
|
||||||
dt_search_begin = datetime.now()
|
|
||||||
test_search = nyaa.search("G Senjou no maou")
|
|
||||||
dt_search_end = datetime.now()
|
|
||||||
with open("test_files/sukebei_search_test.json", "w") as f:
|
|
||||||
json.dump(test_search, f)
|
|
||||||
|
|
||||||
# Get first torrent from found torrents
|
|
||||||
dt_single_torrent_begin = datetime.now()
|
|
||||||
single_torrent = nyaa.get(test_search[0]["id"])
|
|
||||||
dt_single_torrent_end = datetime.now()
|
|
||||||
with open("test_files/sukebei_single_torrent_test.json", "w") as f:
|
|
||||||
json.dump(single_torrent, f)
|
|
||||||
|
|
||||||
dt_user_begin = datetime.now()
|
|
||||||
user_torrents = nyaa.get_user("RUNBKK")
|
|
||||||
dt_user_end = datetime.now()
|
|
||||||
with open("test_files/sukebei_single_user_test.json", "w") as f:
|
|
||||||
json.dump(user_torrents, f)
|
|
||||||
|
|
||||||
print(
|
|
||||||
"Latest torrents time:",
|
|
||||||
(dt_latest_torrents_end - dt_latest_torrents_begin).microseconds / 1000,
|
|
||||||
"msec",
|
|
||||||
)
|
|
||||||
print(
|
|
||||||
"Test search time:", (dt_search_end - dt_search_begin).microseconds / 1000, "msec"
|
|
||||||
)
|
|
||||||
print(
|
|
||||||
"Single torrent time:",
|
|
||||||
(dt_single_torrent_end - dt_single_torrent_begin).microseconds / 1000,
|
|
||||||
"msec",
|
|
||||||
)
|
|
||||||
print("Single user time:", (dt_user_end - dt_user_begin).microseconds / 1000, "msec")
|
|
||||||
Reference in New Issue
Block a user