481 lines
17 KiB
Python
481 lines
17 KiB
Python
from requests import get
|
|
from random import choice
|
|
import io
|
|
from math import floor
|
|
from time import time
|
|
|
|
from PIL import Image
|
|
from interactions import SlashContext, Embed, EmbedFooter, EmbedAttachment, File
|
|
|
|
from gwendolyn.exceptions import CannotConnectToService, ServiceNotProvided
|
|
|
|
class Service():
|
|
def __init__(self, ip: str, port: str, api_key: str) -> None:
|
|
self.ip = ip
|
|
self.port = port
|
|
self.header = {
|
|
"accept": "application/json",
|
|
"X-Api-Key": api_key
|
|
}
|
|
|
|
def test(self):
|
|
return get(f"http://{self.ip}:{self.port}/api", headers=self.header).ok
|
|
|
|
class Radarr(Service):
|
|
def movies(self):
|
|
response = get(f"http://{self.ip}:{self.port}/api/v3/movie", headers=self.header)
|
|
if not response.ok:
|
|
return []
|
|
|
|
return [m for m in response.json() if m["hasFile"]]
|
|
|
|
class Sonarr(Service):
|
|
def shows(self):
|
|
response = get(f"http://{self.ip}:{self.port}/api/v3/series", headers=self.header)
|
|
if not response.ok:
|
|
return []
|
|
|
|
return response.json()
|
|
|
|
class QBittorrent(Service):
|
|
def __init__(self, ip: str, port: str, username: str, password: str) -> None:
|
|
self.ip = ip
|
|
self.port = port
|
|
response = get(f"http://{ip}:{port}/api/v2/auth/login?username={username}&password={password}")
|
|
self.logged_in = response.ok
|
|
self.cookie = {"SID": response.cookies.values()[0]}
|
|
|
|
def test(self):
|
|
return self.logged_in
|
|
|
|
def get_torrents(self):
|
|
response = get(
|
|
f"http://{self.ip}:{self.port}/api/v2/torrents/info",
|
|
cookies=self.cookie
|
|
)
|
|
return response.json()
|
|
|
|
class TMDb():
|
|
def __init__(self, api_token: str) -> None:
|
|
self.header = {
|
|
"accept": "application/json",
|
|
"Authorization": "Bearer "+api_token
|
|
}
|
|
|
|
def test(self):
|
|
return get(f"https://api.themoviedb.org/3/account", headers=self.header).ok
|
|
|
|
|
|
class BetterNetflix():
|
|
def __init__(self, bot, **kwargs) -> None:
|
|
self.bot = bot
|
|
|
|
services = ["radarr","sonarr","tmdb","qbittorrent"]
|
|
|
|
for service in services:
|
|
try:
|
|
setattr(self,service,kwargs[service])
|
|
except:
|
|
raise ServiceNotProvided(service)
|
|
|
|
if getattr(self,service).test():
|
|
self.bot.log(f"Connected to {service}")
|
|
else:
|
|
raise CannotConnectToService(service)
|
|
|
|
|
|
def _poster_color(self, poster_url: str):
|
|
response = get(poster_url, stream=True)
|
|
img = Image.open(io.BytesIO(response.content))
|
|
img.thumbnail((150, 100))
|
|
|
|
# Reduce colors (uses k-means internally)
|
|
paletted = img.convert('P', palette=Image.ADAPTIVE, colors=6)
|
|
|
|
buf = io.BytesIO()
|
|
img2 = paletted.convert("RGB")
|
|
img2.save("gwendolyn/resources/temp.jpg")
|
|
|
|
# Find the color that occurs most often
|
|
palette = paletted.getpalette()
|
|
color_counts = sorted(paletted.getcolors(), reverse=True)
|
|
palette_index = color_counts[0][1]
|
|
dominant_color = palette[palette_index*3:palette_index*3+3]
|
|
|
|
return dominant_color
|
|
|
|
async def movie(self, ctx: SlashContext):
|
|
msg = await ctx.send("Finding a random movie...")
|
|
movies = self.radarr.movies()
|
|
if len(movies) == 0:
|
|
await msg.edit(content="Unable to find any movies")
|
|
return
|
|
|
|
picked_movie = choice(movies)
|
|
description = ""
|
|
|
|
if "imdb" in picked_movie['ratings']:
|
|
description += f"<:imdb:1301506320603676782> {picked_movie['ratings']['imdb']['value']}"
|
|
description += "\u1CBC\u1CBC"
|
|
|
|
if "rottenTomatoes" in picked_movie['ratings']:
|
|
rt_value = picked_movie['ratings']['rottenTomatoes']['value']
|
|
rt_icon = "<:fresh:1301509701338660894>" if rt_value >= 60 else "<:rotten:1301509685907820607>"
|
|
description += f"{rt_icon} {rt_value}%"
|
|
|
|
if description != "":
|
|
description += "\n\n"
|
|
|
|
year = EmbedFooter(str(picked_movie["year"])) if "year" in picked_movie else None
|
|
has_poster = len(picked_movie["images"]) > 0 and "remoteUrl" in picked_movie["images"][0]
|
|
poster = EmbedAttachment(picked_movie["images"][0]["remoteUrl"]) if has_poster else None
|
|
|
|
color = self._poster_color(picked_movie["images"][0]["remoteUrl"]) if has_poster else "#ff0000"
|
|
|
|
description += picked_movie["overview"]
|
|
embed = Embed(
|
|
picked_movie["title"],
|
|
description,
|
|
color,
|
|
thumbnail=poster,
|
|
footer=year
|
|
)
|
|
await msg.edit(content="",embed=embed)
|
|
|
|
async def show(self, ctx: SlashContext):
|
|
msg = await ctx.send("Finding a random show...")
|
|
shows = self.sonarr.shows()
|
|
if len(shows) == 0:
|
|
await msg.edit(content="Unable to find any shows")
|
|
return
|
|
|
|
picked_show = choice(shows)
|
|
description = ""
|
|
|
|
if "imdb" in picked_show['ratings']:
|
|
description += f"<:imdb:1301506320603676782> {picked_show['ratings']['imdb']['value']}"
|
|
description += "\u1CBC\u1CBC"
|
|
|
|
if "rottenTomatoes" in picked_show['ratings']:
|
|
rt_value = picked_show['ratings']['rottenTomatoes']['value']
|
|
rt_icon = "<:fresh:1301509701338660894>" if rt_value >= 60 else "<:rotten:1301509685907820607>"
|
|
description += f"{rt_icon} {rt_value}%"
|
|
|
|
if description != "":
|
|
description += "\n\n"
|
|
|
|
year = EmbedFooter(str(picked_show["year"])) if "year" in picked_show else None
|
|
images = {i["coverType"]:i for i in picked_show["images"]}
|
|
has_poster = "poster" in images and "remoteUrl" in images["poster"]
|
|
poster = EmbedAttachment(images["poster"]["remoteUrl"]) if has_poster else None
|
|
|
|
color = self._poster_color(images["poster"]["remoteUrl"]) if has_poster else "#ff0000"
|
|
|
|
description += picked_show["overview"]
|
|
embed = Embed(
|
|
picked_show["title"],
|
|
description,
|
|
color,
|
|
thumbnail=poster,
|
|
footer=year
|
|
)
|
|
await msg.edit(content="",embed=embed)
|
|
|
|
def _draw_torrent_list(self, title_width: int):
|
|
def ratio_to_bar(ratio):
|
|
progress_bar = "|"+("█"*floor(download_ratio*20))
|
|
while len(progress_bar) < 21:
|
|
progress_bar += " "
|
|
|
|
progress_bar += "| "+str(floor(download_ratio*100))+"%"
|
|
|
|
while len(progress_bar) < 27:
|
|
progress_bar += " "
|
|
|
|
return progress_bar
|
|
|
|
message = [""]
|
|
all_downloaded = True
|
|
|
|
dm_section_title = "*Torrent Downloads*"
|
|
dm_section_title_line = "-"*((title_width-len(dm_section_title))//2)
|
|
message.append(
|
|
dm_section_title_line+dm_section_title+dm_section_title_line
|
|
)
|
|
|
|
torrent_list = self.qbittorrent.get_torrents()
|
|
|
|
if len(torrent_list) == 0:
|
|
message.append("No torrents currently downloading")
|
|
return message, all_downloaded
|
|
|
|
for torrent in torrent_list:
|
|
if torrent['category'] not in ["radarr", "tv-sonarr"]:
|
|
break
|
|
|
|
torrent_name = torrent["name"]
|
|
if len(torrent_name) > 30:
|
|
if torrent_name[26] == " ":
|
|
torrent_name = torrent_name[:26]+"...."
|
|
else:
|
|
torrent_name = torrent_name[:27]+"..."
|
|
while len(torrent_name) < 30:
|
|
torrent_name += " "
|
|
|
|
if torrent["size"] == 0:
|
|
download_ratio = 0
|
|
elif torrent["amount_left"] == 0:
|
|
download_ratio = 1
|
|
else:
|
|
download_ratio = min(
|
|
torrent["downloaded"]/torrent["size"],
|
|
1
|
|
)
|
|
|
|
progress_bar = ratio_to_bar(download_ratio)
|
|
|
|
eta_in_seconds = torrent["eta"]
|
|
|
|
if eta_in_seconds >= 8640000:
|
|
eta = "∞"
|
|
else:
|
|
eta = ""
|
|
if eta_in_seconds >= 86400:
|
|
eta += str(floor(eta_in_seconds/86400))+"d "
|
|
if eta_in_seconds >= 3600:
|
|
eta += str(floor((eta_in_seconds%86400)/3600))+"h "
|
|
if eta_in_seconds >= 60:
|
|
eta += str(floor((eta_in_seconds%3600)/60))+"m "
|
|
|
|
eta += str(eta_in_seconds%60)+"s"
|
|
|
|
torrent_info = f"{torrent_name} {progress_bar} "
|
|
torrent_info += f"(Eta: {eta})"
|
|
|
|
if torrent["state"] == "stalledDL":
|
|
torrent_info += " (Stalled)"
|
|
|
|
if not (download_ratio == 1 and
|
|
torrent["last_activity"] < time()-7200):
|
|
message.append(torrent_info)
|
|
|
|
if download_ratio < 1 and torrent["state"] != "stalledDL":
|
|
all_downloaded = False
|
|
|
|
return message, all_downloaded
|
|
|
|
async def _generate_download_list(self, show_dm, show_movies, show_shows,
|
|
episodes):
|
|
"""Generate a list of all torrents.
|
|
|
|
*Returns*
|
|
message_text: str
|
|
A formatted list of all torrents
|
|
|
|
all_downloaded: bool
|
|
Whether all torrents are downloaded
|
|
"""
|
|
self.bot.log("Generating torrent list")
|
|
title_width = 100
|
|
message = []
|
|
|
|
if show_dm:
|
|
m, all_downloaded = self._draw_torrent_list(title_width)
|
|
message += m
|
|
|
|
# if show_movies:
|
|
# message.append("")
|
|
# movies_section_title = "*Missing movies not downloading*"
|
|
# movies_section_line = (
|
|
# "-"*((title_width-len(movies_section_title))//2)
|
|
# )
|
|
# message.append(
|
|
# movies_section_line+movies_section_title+movies_section_line
|
|
# )
|
|
# movie_list = requests.get(
|
|
# self.radarr_url+"movie?apiKey="+self.credentials["radarr_key"]
|
|
# ).json()
|
|
# print(
|
|
# self.radarr_url+"movie?apiKey="+self.credentials["radarr_key"]
|
|
# )
|
|
# movie_queue = requests.get(
|
|
# self.radarr_url+"queue?apiKey="+self.credentials["radarr_key"]
|
|
# ).json()
|
|
# movie_queue_ids = []
|
|
|
|
# for queue_item in movie_queue["records"]:
|
|
# movie_queue_ids.append(queue_item["movieId"])
|
|
|
|
# for movie in movie_list:
|
|
# if (not movie["hasFile"] and
|
|
# movie["id"] not in movie_queue_ids):
|
|
# movie_name = movie["title"]
|
|
# if len(movie_name) > 40:
|
|
# if movie_name[36] == " ":
|
|
# movie_name = movie_name[:36]+"...."
|
|
# else:
|
|
# movie_name = movie_name[:37]+"..."
|
|
|
|
# while len(movie_name) < 41:
|
|
# movie_name += " "
|
|
|
|
# if movie["monitored"]:
|
|
# movie_info = movie_name+"Could not find a torrent"
|
|
# else:
|
|
# movie_info = self.long_strings["No torrent"].format(
|
|
# movie_name
|
|
# )
|
|
|
|
# message.append(movie_info)
|
|
|
|
# if show_shows:
|
|
# message.append("")
|
|
# show_section_title = "*Missing shows not downloading*"
|
|
# show_section_line = "-"*((title_width-len(show_section_title))//2)
|
|
# message.append(
|
|
# show_section_line+show_section_title+show_section_line
|
|
# )
|
|
|
|
# show_list = requests.get(
|
|
# self.sonarr_url+"series?apiKey="+self.credentials["sonarr_key"]
|
|
# ).json()
|
|
|
|
# for show in show_list:
|
|
# if show["seasons"][0]["seasonNumber"] == 0:
|
|
# seasons = show["seasons"][1:]
|
|
# else:
|
|
# seasons = show["seasons"]
|
|
# if any(
|
|
# (
|
|
# i["statistics"]["episodeCount"] !=
|
|
# i["statistics"]["totalEpisodeCount"]
|
|
# ) for i in seasons):
|
|
# if all(
|
|
# i["statistics"]["episodeCount"] == 0 for i in seasons
|
|
# ):
|
|
# message.append(show["title"] + " (all episodes)")
|
|
# else:
|
|
# if episodes:
|
|
# missing_episodes = sum(
|
|
# (i["statistics"]["totalEpisodeCount"] -
|
|
# i["statistics"]["episodeCount"])
|
|
# for i in seasons)
|
|
# message.append(
|
|
# f"{show['title']} ({missing_episodes} episodes)"
|
|
# )
|
|
|
|
message.append("-"*title_width)
|
|
|
|
message_text = "```"+"\n".join(message[1:])+"```"
|
|
if message_text == "``````":
|
|
message_text = self.long_strings["No torrents downloading"]
|
|
|
|
return message_text, all_downloaded
|
|
|
|
# async def downloading(self, ctx, content):
|
|
# """Send message with list of all downloading torrents."""
|
|
# async def send_long_message(ctx,message_text):
|
|
# if len(message_text) <= 1994:
|
|
# await ctx.send("```"+message_text+"```")
|
|
# else:
|
|
# cut_off_index = message_text[:1994].rfind("\n")
|
|
# await ctx.send("```"+message_text[:cut_off_index]+"```")
|
|
# await send_long_message(ctx,message_text[cut_off_index+1:])
|
|
|
|
# await self.bot.defer(ctx)
|
|
|
|
# # showDM, showMovies, showShows, episodes
|
|
# parameters = [False, False, False, False]
|
|
# show_dm_args = ["d", "dm", "downloading", "downloadmanager"]
|
|
# show_movies_args = ["m", "movies"]
|
|
# show_shows_args = ["s", "shows", "series"]
|
|
# show_episode_args = ["e", "episodes"]
|
|
# arg_list = [
|
|
# show_dm_args, show_movies_args, show_shows_args, show_episode_args
|
|
# ]
|
|
# input_args = []
|
|
# valid_arguments = True
|
|
|
|
# while content != "" and valid_arguments:
|
|
# if content[0] == " ":
|
|
# content = content[1:]
|
|
# elif content[0] == "-":
|
|
# if content[1] == "-":
|
|
# arg_start = 2
|
|
# if " " in content:
|
|
# arg_stop = content.find(" ")
|
|
# else:
|
|
# arg_stop = None
|
|
# else:
|
|
# arg_start = 1
|
|
# arg_stop = 2
|
|
|
|
# input_args.append(content[arg_start:arg_stop])
|
|
# if arg_stop is None:
|
|
# content = ""
|
|
# else:
|
|
# content = content[arg_stop:]
|
|
# else:
|
|
# valid_arguments = False
|
|
|
|
# if valid_arguments:
|
|
# for arg_index, arg_aliases in enumerate(arg_list):
|
|
# arg_in_input = [i in input_args for i in arg_aliases]
|
|
# if any(arg_in_input):
|
|
# input_args.remove(arg_aliases[arg_in_input.index(True)])
|
|
# parameters[arg_index] = True
|
|
|
|
# if len(input_args) != 0 or (not parameters[2] and parameters[3]):
|
|
# valid_arguments = False
|
|
|
|
# show_anything = any(i for i in parameters)
|
|
# if not (valid_arguments and show_anything):
|
|
# await ctx.send(self.long_strings["Invalid parameters"])
|
|
# else:
|
|
# message_text, all_downloaded = await self.__generate_download_list(
|
|
# *parameters
|
|
# )
|
|
# if not message_text.startswith("```"):
|
|
# await ctx.send(message_text)
|
|
|
|
# elif len(message_text) > 2000:
|
|
# message_text = message_text[3:-3]
|
|
# await send_long_message(ctx,message_text)
|
|
|
|
# elif all_downloaded:
|
|
# await ctx.send(message_text)
|
|
|
|
# else:
|
|
# updates_left = 60
|
|
# message_text = self.long_strings["Update"].format(
|
|
# message_text[:-3], ceil(updates_left/6)
|
|
# )
|
|
# old_message = await ctx.send(message_text)
|
|
|
|
# while ((not all_downloaded) and updates_left > 0):
|
|
# await asyncio.sleep(10)
|
|
# updates_left -= 1
|
|
# message_text, all_downloaded = await (
|
|
# self.__generate_download_list(*parameters)
|
|
# )
|
|
# message_text = self.long_strings["Update"].format(
|
|
# message_text[:-3],
|
|
# ceil(updates_left/6)
|
|
# )
|
|
# await old_message.edit(content = message_text)
|
|
|
|
# message_text, all_downloaded = await (
|
|
# self.__generate_download_list(*parameters)
|
|
# )
|
|
|
|
# if message_text.startswith("```"):
|
|
# if all_downloaded:
|
|
# self.bot.log("All torrents are downloaded")
|
|
# else:
|
|
# message_text = self.long_strings["No updates"].format(
|
|
# message_text[:-3]
|
|
# )
|
|
# self.bot.log("The message updated 20 times")
|
|
|
|
# await old_message.edit(content = message_text)
|