from requests import get, post from random import choice import io from math import floor from time import time from PIL import Image from interactions import SlashContext, Embed, EmbedFooter, EmbedAttachment, File from gwendolyn.exceptions import CannotConnectToService, ServiceNotProvided class Service(): def __init__(self, ip: str, port: str, api_key: str) -> None: self.ip = ip self.port = port self.header = { "accept": "application/json", "X-Api-Key": api_key } def test(self): return get(f"http://{self.ip}:{self.port}/api", headers=self.header).ok class Radarr(Service): def movies(self): response = get(f"http://{self.ip}:{self.port}/api/v3/movie", headers=self.header) if not response.ok: return [] return [m for m in response.json() if m["hasFile"]] class Sonarr(Service): def shows(self): response = get(f"http://{self.ip}:{self.port}/api/v3/series", headers=self.header) if not response.ok: return [] return response.json() class QBittorrent(Service): def __init__(self, ip: str, port: str, username: str, password: str) -> None: self.ip = ip self.port = port data = {"username": username,"password": password} response = post(f"http://{ip}:{port}/api/v2/auth/login", data=data) self.logged_in = response.ok self.cookie = {"SID": response.cookies.values()[0]} def test(self): return self.logged_in def get_torrents(self): response = get( f"http://{self.ip}:{self.port}/api/v2/torrents/info", cookies=self.cookie ) return response.json() class TMDb(): def __init__(self, api_token: str) -> None: self.header = { "accept": "application/json", "Authorization": "Bearer "+api_token } def test(self): return get(f"https://api.themoviedb.org/3/account", headers=self.header).ok class BetterNetflix(): def __init__(self, bot, **kwargs) -> None: self.bot = bot services = ["radarr","sonarr","tmdb","qbittorrent"] for service in services: try: setattr(self,service,kwargs[service]) except: raise ServiceNotProvided(service) if getattr(self,service).test(): self.bot.log(f"Connected to {service}") else: raise CannotConnectToService(service) def _poster_color(self, poster_url: str): response = get(poster_url, stream=True) img = Image.open(io.BytesIO(response.content)) img.thumbnail((150, 100)) # Reduce colors (uses k-means internally) paletted = img.convert('P', palette=Image.ADAPTIVE, colors=6) buf = io.BytesIO() img2 = paletted.convert("RGB") img2.save("gwendolyn/resources/temp.jpg") # Find the color that occurs most often palette = paletted.getpalette() color_counts = sorted(paletted.getcolors(), reverse=True) palette_index = color_counts[0][1] dominant_color = palette[palette_index*3:palette_index*3+3] return dominant_color async def movie(self, ctx: SlashContext): msg = await ctx.send("Finding a random movie...") movies = self.radarr.movies() if len(movies) == 0: await msg.edit(content="Unable to find any movies") return picked_movie = choice(movies) description = "" if "imdb" in picked_movie['ratings']: description += f"<:imdb:1301506320603676782> {picked_movie['ratings']['imdb']['value']}" description += "/10 " if "rottenTomatoes" in picked_movie['ratings']: rt_value = picked_movie['ratings']['rottenTomatoes']['value'] rt_icon = "<:fresh:1301509701338660894>" if rt_value >= 60 else "<:rotten:1301509685907820607>" description += f"{rt_icon} {rt_value}%" if description != "": description += "\n\n" year = EmbedFooter(str(picked_movie["year"])) if "year" in picked_movie else None has_poster = len(picked_movie["images"]) > 0 and "remoteUrl" in picked_movie["images"][0] poster = EmbedAttachment(picked_movie["images"][0]["remoteUrl"]) if has_poster else None color = self._poster_color(picked_movie["images"][0]["remoteUrl"]) if has_poster else "#ff0000" description += picked_movie["overview"] embed = Embed( picked_movie["title"], description, color, thumbnail=poster, footer=year ) await msg.edit(content="",embed=embed) async def show(self, ctx: SlashContext): msg = await ctx.send("Finding a random show...") shows = self.sonarr.shows() if len(shows) == 0: await msg.edit(content="Unable to find any shows") return picked_show = choice(shows) description = "" if "imdb" in picked_show['ratings']: description += f"<:imdb:1301506320603676782> {picked_show['ratings']['imdb']['value']}" description += "\u1CBC\u1CBC" if "rottenTomatoes" in picked_show['ratings']: rt_value = picked_show['ratings']['rottenTomatoes']['value'] rt_icon = "<:fresh:1301509701338660894>" if rt_value >= 60 else "<:rotten:1301509685907820607>" description += f"{rt_icon} {rt_value}%" if description != "": description += "\n\n" year = EmbedFooter(str(picked_show["year"])) if "year" in picked_show else None images = {i["coverType"]:i for i in picked_show["images"]} has_poster = "poster" in images and "remoteUrl" in images["poster"] poster = EmbedAttachment(images["poster"]["remoteUrl"]) if has_poster else None color = self._poster_color(images["poster"]["remoteUrl"]) if has_poster else "#ff0000" description += picked_show["overview"] embed = Embed( picked_show["title"], description, color, thumbnail=poster, footer=year ) await msg.edit(content="",embed=embed) def _draw_torrent_list(self, title_width: int): def ratio_to_bar(ratio): progress_bar = "|"+("█"*floor(download_ratio*20)) while len(progress_bar) < 21: progress_bar += " " progress_bar += "| "+str(floor(download_ratio*100))+"%" while len(progress_bar) < 27: progress_bar += " " return progress_bar message = [] all_downloaded = True dm_section_title = "*Torrent Downloads*" dm_section_title_line = "-"*((title_width-len(dm_section_title))//2) message.append( dm_section_title_line+dm_section_title+dm_section_title_line ) torrent_list = self.qbittorrent.get_torrents() if len(torrent_list) == 0: message.append("No torrents currently downloading") return message, all_downloaded for torrent in torrent_list: if torrent['category'] not in ["radarr", "tv-sonarr"]: break torrent_name = torrent["name"] if len(torrent_name) > 30: if torrent_name[26] == " ": torrent_name = torrent_name[:26]+"...." else: torrent_name = torrent_name[:27]+"..." while len(torrent_name) < 30: torrent_name += " " if torrent["size"] == 0: download_ratio = 0 elif torrent["amount_left"] == 0: download_ratio = 1 else: download_ratio = min( torrent["downloaded"]/torrent["size"], 1 ) progress_bar = ratio_to_bar(download_ratio) eta_in_seconds = torrent["eta"] if eta_in_seconds >= 8640000: eta = "∞" else: eta = "" if eta_in_seconds >= 86400: eta += str(floor(eta_in_seconds/86400))+"d " if eta_in_seconds >= 3600: eta += str(floor((eta_in_seconds%86400)/3600))+"h " if eta_in_seconds >= 60: eta += str(floor((eta_in_seconds%3600)/60))+"m " eta += str(eta_in_seconds%60)+"s" torrent_info = f"{torrent_name} {progress_bar} " torrent_info += f"(Eta: {eta})" if torrent["state"] == "stalledDL": torrent_info += " (Stalled)" if not (download_ratio == 1 and torrent["last_activity"] < time()-7200): message.append(torrent_info) return message async def _generate_download_list(self): """Generate a list of all torrents. *Returns* message_text: str A formatted list of all torrents """ self.bot.log("Generating torrent list") title_width = 90 message = self._draw_torrent_list(title_width) message_text = "```"+"\n".join(message)+"```" if message_text == "``````": message_text = self.long_strings["No torrents downloading"] return message_text async def downloading(self, ctx: SlashContext): """Send message with list of all downloading torrents.""" async def send_long_message(ctx,message_text): if len(message_text) <= 1994: await ctx.send("```"+message_text+"```") else: cut_off_index = message_text[:1994].rfind("\n") await ctx.send("```"+message_text[:cut_off_index]+"```") await send_long_message(ctx,message_text[cut_off_index+1:]) ctx.defer() message_text = await self._generate_download_list() if not message_text.startswith("```"): await ctx.send(message_text) elif len(message_text) > 2000: message_text = message_text[3:-3] await send_long_message(ctx,message_text) else: await ctx.send(message_text)