305 lines
10 KiB
Python
305 lines
10 KiB
Python
from requests import get, post
|
|
from random import choice
|
|
import io
|
|
from math import floor
|
|
from time import time
|
|
|
|
from PIL import Image
|
|
from interactions import SlashContext, Embed, EmbedFooter, EmbedAttachment, File
|
|
|
|
from gwendolyn.exceptions import CannotConnectToService, ServiceNotProvided
|
|
|
|
class Service():
|
|
def __init__(self, ip: str, port: str, api_key: str) -> None:
|
|
self.ip = ip
|
|
self.port = port
|
|
self.header = {
|
|
"accept": "application/json",
|
|
"X-Api-Key": api_key
|
|
}
|
|
|
|
def test(self):
|
|
return get(f"http://{self.ip}:{self.port}/api", headers=self.header).ok
|
|
|
|
class Radarr(Service):
|
|
def movies(self):
|
|
response = get(f"http://{self.ip}:{self.port}/api/v3/movie", headers=self.header)
|
|
if not response.ok:
|
|
return []
|
|
|
|
return [m for m in response.json() if m["hasFile"]]
|
|
|
|
class Sonarr(Service):
|
|
def shows(self):
|
|
response = get(f"http://{self.ip}:{self.port}/api/v3/series", headers=self.header)
|
|
if not response.ok:
|
|
return []
|
|
|
|
return response.json()
|
|
|
|
class QBittorrent(Service):
|
|
def __init__(self, ip: str, port: str, username: str, password: str) -> None:
|
|
self.ip = ip
|
|
self.port = port
|
|
data = {"username": username,"password": password}
|
|
response = post(f"http://{ip}:{port}/api/v2/auth/login", data=data)
|
|
self.logged_in = response.ok
|
|
self.cookie = {"SID": response.cookies.values()[0]}
|
|
|
|
def test(self):
|
|
return self.logged_in
|
|
|
|
def get_torrents(self):
|
|
response = get(
|
|
f"http://{self.ip}:{self.port}/api/v2/torrents/info",
|
|
cookies=self.cookie
|
|
)
|
|
return response.json()
|
|
|
|
class TMDb():
|
|
def __init__(self, api_token: str) -> None:
|
|
self.header = {
|
|
"accept": "application/json",
|
|
"Authorization": "Bearer "+api_token
|
|
}
|
|
|
|
def test(self):
|
|
return get(f"https://api.themoviedb.org/3/account", headers=self.header).ok
|
|
|
|
|
|
class BetterNetflix():
|
|
def __init__(self, bot, **kwargs) -> None:
|
|
self.bot = bot
|
|
|
|
services = ["radarr","sonarr","tmdb","qbittorrent"]
|
|
|
|
for service in services:
|
|
try:
|
|
setattr(self,service,kwargs[service])
|
|
except:
|
|
raise ServiceNotProvided(service)
|
|
|
|
if getattr(self,service).test():
|
|
self.bot.log(f"Connected to {service}")
|
|
else:
|
|
raise CannotConnectToService(service)
|
|
|
|
|
|
def _poster_color(self, poster_url: str):
|
|
response = get(poster_url, stream=True)
|
|
img = Image.open(io.BytesIO(response.content))
|
|
img.thumbnail((150, 100))
|
|
|
|
# Reduce colors (uses k-means internally)
|
|
paletted = img.convert('P', palette=Image.ADAPTIVE, colors=6)
|
|
|
|
buf = io.BytesIO()
|
|
img2 = paletted.convert("RGB")
|
|
img2.save("gwendolyn/resources/temp.jpg")
|
|
|
|
# Find the color that occurs most often
|
|
palette = paletted.getpalette()
|
|
color_counts = sorted(paletted.getcolors(), reverse=True)
|
|
palette_index = color_counts[0][1]
|
|
dominant_color = palette[palette_index*3:palette_index*3+3]
|
|
|
|
return dominant_color
|
|
|
|
async def movie(self, ctx: SlashContext):
|
|
msg = await ctx.send("Finding a random movie...")
|
|
movies = self.radarr.movies()
|
|
if len(movies) == 0:
|
|
await msg.edit(content="Unable to find any movies")
|
|
return
|
|
|
|
picked_movie = choice(movies)
|
|
description = ""
|
|
|
|
if "imdb" in picked_movie['ratings']:
|
|
description += f"<:imdb:1301506320603676782> {picked_movie['ratings']['imdb']['value']}"
|
|
description += "/10 "
|
|
|
|
if "rottenTomatoes" in picked_movie['ratings']:
|
|
rt_value = picked_movie['ratings']['rottenTomatoes']['value']
|
|
rt_icon = "<:fresh:1301509701338660894>" if rt_value >= 60 else "<:rotten:1301509685907820607>"
|
|
description += f"{rt_icon} {rt_value}%"
|
|
|
|
if description != "":
|
|
description += "\n\n"
|
|
|
|
year = EmbedFooter(str(picked_movie["year"])) if "year" in picked_movie else None
|
|
has_poster = len(picked_movie["images"]) > 0 and "remoteUrl" in picked_movie["images"][0]
|
|
poster = EmbedAttachment(picked_movie["images"][0]["remoteUrl"]) if has_poster else None
|
|
|
|
color = self._poster_color(picked_movie["images"][0]["remoteUrl"]) if has_poster else "#ff0000"
|
|
|
|
description += picked_movie["overview"]
|
|
embed = Embed(
|
|
picked_movie["title"],
|
|
description,
|
|
color,
|
|
thumbnail=poster,
|
|
footer=year
|
|
)
|
|
await msg.edit(content="",embed=embed)
|
|
|
|
async def show(self, ctx: SlashContext):
|
|
msg = await ctx.send("Finding a random show...")
|
|
shows = self.sonarr.shows()
|
|
if len(shows) == 0:
|
|
await msg.edit(content="Unable to find any shows")
|
|
return
|
|
|
|
picked_show = choice(shows)
|
|
description = ""
|
|
|
|
if "imdb" in picked_show['ratings']:
|
|
description += f"<:imdb:1301506320603676782> {picked_show['ratings']['imdb']['value']}"
|
|
description += "\u1CBC\u1CBC"
|
|
|
|
if "rottenTomatoes" in picked_show['ratings']:
|
|
rt_value = picked_show['ratings']['rottenTomatoes']['value']
|
|
rt_icon = "<:fresh:1301509701338660894>" if rt_value >= 60 else "<:rotten:1301509685907820607>"
|
|
description += f"{rt_icon} {rt_value}%"
|
|
|
|
if description != "":
|
|
description += "\n\n"
|
|
|
|
year = EmbedFooter(str(picked_show["year"])) if "year" in picked_show else None
|
|
images = {i["coverType"]:i for i in picked_show["images"]}
|
|
has_poster = "poster" in images and "remoteUrl" in images["poster"]
|
|
poster = EmbedAttachment(images["poster"]["remoteUrl"]) if has_poster else None
|
|
|
|
color = self._poster_color(images["poster"]["remoteUrl"]) if has_poster else "#ff0000"
|
|
|
|
description += picked_show["overview"]
|
|
embed = Embed(
|
|
picked_show["title"],
|
|
description,
|
|
color,
|
|
thumbnail=poster,
|
|
footer=year
|
|
)
|
|
await msg.edit(content="",embed=embed)
|
|
|
|
def _draw_torrent_list(self, title_width: int):
|
|
def ratio_to_bar(ratio):
|
|
progress_bar = "|"+("█"*floor(download_ratio*20))
|
|
while len(progress_bar) < 21:
|
|
progress_bar += " "
|
|
|
|
progress_bar += "| "+str(floor(download_ratio*100))+"%"
|
|
|
|
while len(progress_bar) < 27:
|
|
progress_bar += " "
|
|
|
|
return progress_bar
|
|
|
|
message = []
|
|
all_downloaded = True
|
|
|
|
dm_section_title = "*Torrent Downloads*"
|
|
dm_section_title_line = "-"*((title_width-len(dm_section_title))//2)
|
|
message.append(
|
|
dm_section_title_line+dm_section_title+dm_section_title_line
|
|
)
|
|
|
|
torrent_list = self.qbittorrent.get_torrents()
|
|
|
|
if len(torrent_list) == 0:
|
|
message.append("No torrents currently downloading")
|
|
return message, all_downloaded
|
|
|
|
for torrent in torrent_list:
|
|
if torrent['category'] not in ["radarr", "tv-sonarr"]:
|
|
break
|
|
|
|
torrent_name = torrent["name"]
|
|
if len(torrent_name) > 30:
|
|
if torrent_name[26] == " ":
|
|
torrent_name = torrent_name[:26]+"...."
|
|
else:
|
|
torrent_name = torrent_name[:27]+"..."
|
|
while len(torrent_name) < 30:
|
|
torrent_name += " "
|
|
|
|
if torrent["size"] == 0:
|
|
download_ratio = 0
|
|
elif torrent["amount_left"] == 0:
|
|
download_ratio = 1
|
|
else:
|
|
download_ratio = min(
|
|
torrent["downloaded"]/torrent["size"],
|
|
1
|
|
)
|
|
|
|
progress_bar = ratio_to_bar(download_ratio)
|
|
|
|
eta_in_seconds = torrent["eta"]
|
|
|
|
if eta_in_seconds >= 8640000:
|
|
eta = "∞"
|
|
else:
|
|
eta = ""
|
|
if eta_in_seconds >= 86400:
|
|
eta += str(floor(eta_in_seconds/86400))+"d "
|
|
if eta_in_seconds >= 3600:
|
|
eta += str(floor((eta_in_seconds%86400)/3600))+"h "
|
|
if eta_in_seconds >= 60:
|
|
eta += str(floor((eta_in_seconds%3600)/60))+"m "
|
|
|
|
eta += str(eta_in_seconds%60)+"s"
|
|
|
|
torrent_info = f"{torrent_name} {progress_bar} "
|
|
torrent_info += f"(Eta: {eta})"
|
|
|
|
if torrent["state"] == "stalledDL":
|
|
torrent_info += " (Stalled)"
|
|
|
|
if not (download_ratio == 1 and
|
|
torrent["last_activity"] < time()-7200):
|
|
message.append(torrent_info)
|
|
|
|
return message
|
|
|
|
async def _generate_download_list(self):
|
|
"""Generate a list of all torrents.
|
|
|
|
*Returns*
|
|
message_text: str
|
|
A formatted list of all torrents
|
|
"""
|
|
self.bot.log("Generating torrent list")
|
|
title_width = 90
|
|
message = self._draw_torrent_list(title_width)
|
|
|
|
message_text = "```"+"\n".join(message)+"```"
|
|
if message_text == "``````":
|
|
message_text = self.long_strings["No torrents downloading"]
|
|
|
|
return message_text
|
|
|
|
async def downloading(self, ctx: SlashContext):
|
|
"""Send message with list of all downloading torrents."""
|
|
async def send_long_message(ctx,message_text):
|
|
if len(message_text) <= 1994:
|
|
await ctx.send("```"+message_text+"```")
|
|
else:
|
|
cut_off_index = message_text[:1994].rfind("\n")
|
|
await ctx.send("```"+message_text[:cut_off_index]+"```")
|
|
await send_long_message(ctx,message_text[cut_off_index+1:])
|
|
|
|
await ctx.defer()
|
|
|
|
message_text = await self._generate_download_list()
|
|
if not message_text.startswith("```"):
|
|
await ctx.send(message_text)
|
|
|
|
elif len(message_text) > 2000:
|
|
message_text = message_text[3:-3]
|
|
await send_long_message(ctx,message_text)
|
|
|
|
else:
|
|
await ctx.send(message_text)
|
|
|