from requests import get from random import choice import io from math import floor from time import time from PIL import Image from interactions import SlashContext, Embed, EmbedFooter, EmbedAttachment, File from gwendolyn.exceptions import CannotConnectToService, ServiceNotProvided class Service(): def __init__(self, ip: str, port: str, api_key: str) -> None: self.ip = ip self.port = port self.header = { "accept": "application/json", "X-Api-Key": api_key } def test(self): return get(f"http://{self.ip}:{self.port}/api", headers=self.header).ok class Radarr(Service): def movies(self): response = get(f"http://{self.ip}:{self.port}/api/v3/movie", headers=self.header) if not response.ok: return [] return [m for m in response.json() if m["hasFile"]] class Sonarr(Service): def shows(self): response = get(f"http://{self.ip}:{self.port}/api/v3/series", headers=self.header) if not response.ok: return [] return response.json() class QBittorrent(Service): def __init__(self, ip: str, port: str, username: str, password: str) -> None: self.ip = ip self.port = port response = get(f"http://{ip}:{port}/api/v2/auth/login?username={username}&password={password}") self.logged_in = response.ok self.cookie = {"SID": response.cookies.values()[0]} def test(self): return self.logged_in def get_torrents(self): response = get( f"http://{self.ip}:{self.port}/api/v2/torrents/info", cookies=self.cookie ) return response.json() class TMDb(): def __init__(self, api_token: str) -> None: self.header = { "accept": "application/json", "Authorization": "Bearer "+api_token } def test(self): return get(f"https://api.themoviedb.org/3/account", headers=self.header).ok class BetterNetflix(): def __init__(self, bot, **kwargs) -> None: self.bot = bot services = ["radarr","sonarr","tmdb","qbittorrent"] for service in services: try: setattr(self,service,kwargs[service]) except: raise ServiceNotProvided(service) if getattr(self,service).test(): self.bot.log(f"Connected to {service}") else: raise CannotConnectToService(service) def _poster_color(self, poster_url: str): response = get(poster_url, stream=True) img = Image.open(io.BytesIO(response.content)) img.thumbnail((150, 100)) # Reduce colors (uses k-means internally) paletted = img.convert('P', palette=Image.ADAPTIVE, colors=6) buf = io.BytesIO() img2 = paletted.convert("RGB") img2.save("gwendolyn/resources/temp.jpg") # Find the color that occurs most often palette = paletted.getpalette() color_counts = sorted(paletted.getcolors(), reverse=True) palette_index = color_counts[0][1] dominant_color = palette[palette_index*3:palette_index*3+3] return dominant_color async def movie(self, ctx: SlashContext): msg = await ctx.send("Finding a random movie...") movies = self.radarr.movies() if len(movies) == 0: await msg.edit(content="Unable to find any movies") return picked_movie = choice(movies) description = "" if "imdb" in picked_movie['ratings']: description += f"<:imdb:1301506320603676782> {picked_movie['ratings']['imdb']['value']}" description += "\u1CBC\u1CBC" if "rottenTomatoes" in picked_movie['ratings']: rt_value = picked_movie['ratings']['rottenTomatoes']['value'] rt_icon = "<:fresh:1301509701338660894>" if rt_value >= 60 else "<:rotten:1301509685907820607>" description += f"{rt_icon} {rt_value}%" if description != "": description += "\n\n" year = EmbedFooter(str(picked_movie["year"])) if "year" in picked_movie else None has_poster = len(picked_movie["images"]) > 0 and "remoteUrl" in picked_movie["images"][0] poster = EmbedAttachment(picked_movie["images"][0]["remoteUrl"]) if has_poster else None color = self._poster_color(picked_movie["images"][0]["remoteUrl"]) if has_poster else "#ff0000" description += picked_movie["overview"] embed = Embed( picked_movie["title"], description, color, thumbnail=poster, footer=year ) await msg.edit(content="",embed=embed) async def show(self, ctx: SlashContext): msg = await ctx.send("Finding a random show...") shows = self.sonarr.shows() if len(shows) == 0: await msg.edit(content="Unable to find any shows") return picked_show = choice(shows) description = "" if "imdb" in picked_show['ratings']: description += f"<:imdb:1301506320603676782> {picked_show['ratings']['imdb']['value']}" description += "\u1CBC\u1CBC" if "rottenTomatoes" in picked_show['ratings']: rt_value = picked_show['ratings']['rottenTomatoes']['value'] rt_icon = "<:fresh:1301509701338660894>" if rt_value >= 60 else "<:rotten:1301509685907820607>" description += f"{rt_icon} {rt_value}%" if description != "": description += "\n\n" year = EmbedFooter(str(picked_show["year"])) if "year" in picked_show else None images = {i["coverType"]:i for i in picked_show["images"]} has_poster = "poster" in images and "remoteUrl" in images["poster"] poster = EmbedAttachment(images["poster"]["remoteUrl"]) if has_poster else None color = self._poster_color(images["poster"]["remoteUrl"]) if has_poster else "#ff0000" description += picked_show["overview"] embed = Embed( picked_show["title"], description, color, thumbnail=poster, footer=year ) await msg.edit(content="",embed=embed) def _draw_torrent_list(self, title_width: int): def ratio_to_bar(ratio): progress_bar = "|"+("█"*floor(download_ratio*20)) while len(progress_bar) < 21: progress_bar += " " progress_bar += "| "+str(floor(download_ratio*100))+"%" while len(progress_bar) < 27: progress_bar += " " return progress_bar message = [""] all_downloaded = True dm_section_title = "*Torrent Downloads*" dm_section_title_line = "-"*((title_width-len(dm_section_title))//2) message.append( dm_section_title_line+dm_section_title+dm_section_title_line ) torrent_list = self.qbittorrent.get_torrents() if len(torrent_list) == 0: message.append("No torrents currently downloading") return message, all_downloaded for torrent in torrent_list: if torrent['category'] not in ["radarr", "tv-sonarr"]: break torrent_name = torrent["name"] if len(torrent_name) > 30: if torrent_name[26] == " ": torrent_name = torrent_name[:26]+"...." else: torrent_name = torrent_name[:27]+"..." while len(torrent_name) < 30: torrent_name += " " if torrent["size"] == 0: download_ratio = 0 elif torrent["amount_left"] == 0: download_ratio = 1 else: download_ratio = min( torrent["downloaded"]/torrent["size"], 1 ) progress_bar = ratio_to_bar(download_ratio) eta_in_seconds = torrent["eta"] if eta_in_seconds >= 8640000: eta = "∞" else: eta = "" if eta_in_seconds >= 86400: eta += str(floor(eta_in_seconds/86400))+"d " if eta_in_seconds >= 3600: eta += str(floor((eta_in_seconds%86400)/3600))+"h " if eta_in_seconds >= 60: eta += str(floor((eta_in_seconds%3600)/60))+"m " eta += str(eta_in_seconds%60)+"s" torrent_info = f"{torrent_name} {progress_bar} " torrent_info += f"(Eta: {eta})" if torrent["state"] == "stalledDL": torrent_info += " (Stalled)" if not (download_ratio == 1 and torrent["last_activity"] < time()-7200): message.append(torrent_info) if download_ratio < 1 and torrent["state"] != "stalledDL": all_downloaded = False return message, all_downloaded async def _generate_download_list(self, show_dm, show_movies, show_shows, episodes): """Generate a list of all torrents. *Returns* message_text: str A formatted list of all torrents all_downloaded: bool Whether all torrents are downloaded """ self.bot.log("Generating torrent list") title_width = 100 message = [] if show_dm: m, all_downloaded = self._draw_torrent_list(title_width) message += m # if show_movies: # message.append("") # movies_section_title = "*Missing movies not downloading*" # movies_section_line = ( # "-"*((title_width-len(movies_section_title))//2) # ) # message.append( # movies_section_line+movies_section_title+movies_section_line # ) # movie_list = requests.get( # self.radarr_url+"movie?apiKey="+self.credentials["radarr_key"] # ).json() # print( # self.radarr_url+"movie?apiKey="+self.credentials["radarr_key"] # ) # movie_queue = requests.get( # self.radarr_url+"queue?apiKey="+self.credentials["radarr_key"] # ).json() # movie_queue_ids = [] # for queue_item in movie_queue["records"]: # movie_queue_ids.append(queue_item["movieId"]) # for movie in movie_list: # if (not movie["hasFile"] and # movie["id"] not in movie_queue_ids): # movie_name = movie["title"] # if len(movie_name) > 40: # if movie_name[36] == " ": # movie_name = movie_name[:36]+"...." # else: # movie_name = movie_name[:37]+"..." # while len(movie_name) < 41: # movie_name += " " # if movie["monitored"]: # movie_info = movie_name+"Could not find a torrent" # else: # movie_info = self.long_strings["No torrent"].format( # movie_name # ) # message.append(movie_info) # if show_shows: # message.append("") # show_section_title = "*Missing shows not downloading*" # show_section_line = "-"*((title_width-len(show_section_title))//2) # message.append( # show_section_line+show_section_title+show_section_line # ) # show_list = requests.get( # self.sonarr_url+"series?apiKey="+self.credentials["sonarr_key"] # ).json() # for show in show_list: # if show["seasons"][0]["seasonNumber"] == 0: # seasons = show["seasons"][1:] # else: # seasons = show["seasons"] # if any( # ( # i["statistics"]["episodeCount"] != # i["statistics"]["totalEpisodeCount"] # ) for i in seasons): # if all( # i["statistics"]["episodeCount"] == 0 for i in seasons # ): # message.append(show["title"] + " (all episodes)") # else: # if episodes: # missing_episodes = sum( # (i["statistics"]["totalEpisodeCount"] - # i["statistics"]["episodeCount"]) # for i in seasons) # message.append( # f"{show['title']} ({missing_episodes} episodes)" # ) message.append("-"*title_width) message_text = "```"+"\n".join(message[1:])+"```" if message_text == "``````": message_text = self.long_strings["No torrents downloading"] return message_text, all_downloaded # async def downloading(self, ctx, content): # """Send message with list of all downloading torrents.""" # async def send_long_message(ctx,message_text): # if len(message_text) <= 1994: # await ctx.send("```"+message_text+"```") # else: # cut_off_index = message_text[:1994].rfind("\n") # await ctx.send("```"+message_text[:cut_off_index]+"```") # await send_long_message(ctx,message_text[cut_off_index+1:]) # await self.bot.defer(ctx) # # showDM, showMovies, showShows, episodes # parameters = [False, False, False, False] # show_dm_args = ["d", "dm", "downloading", "downloadmanager"] # show_movies_args = ["m", "movies"] # show_shows_args = ["s", "shows", "series"] # show_episode_args = ["e", "episodes"] # arg_list = [ # show_dm_args, show_movies_args, show_shows_args, show_episode_args # ] # input_args = [] # valid_arguments = True # while content != "" and valid_arguments: # if content[0] == " ": # content = content[1:] # elif content[0] == "-": # if content[1] == "-": # arg_start = 2 # if " " in content: # arg_stop = content.find(" ") # else: # arg_stop = None # else: # arg_start = 1 # arg_stop = 2 # input_args.append(content[arg_start:arg_stop]) # if arg_stop is None: # content = "" # else: # content = content[arg_stop:] # else: # valid_arguments = False # if valid_arguments: # for arg_index, arg_aliases in enumerate(arg_list): # arg_in_input = [i in input_args for i in arg_aliases] # if any(arg_in_input): # input_args.remove(arg_aliases[arg_in_input.index(True)]) # parameters[arg_index] = True # if len(input_args) != 0 or (not parameters[2] and parameters[3]): # valid_arguments = False # show_anything = any(i for i in parameters) # if not (valid_arguments and show_anything): # await ctx.send(self.long_strings["Invalid parameters"]) # else: # message_text, all_downloaded = await self.__generate_download_list( # *parameters # ) # if not message_text.startswith("```"): # await ctx.send(message_text) # elif len(message_text) > 2000: # message_text = message_text[3:-3] # await send_long_message(ctx,message_text) # elif all_downloaded: # await ctx.send(message_text) # else: # updates_left = 60 # message_text = self.long_strings["Update"].format( # message_text[:-3], ceil(updates_left/6) # ) # old_message = await ctx.send(message_text) # while ((not all_downloaded) and updates_left > 0): # await asyncio.sleep(10) # updates_left -= 1 # message_text, all_downloaded = await ( # self.__generate_download_list(*parameters) # ) # message_text = self.long_strings["Update"].format( # message_text[:-3], # ceil(updates_left/6) # ) # await old_message.edit(content = message_text) # message_text, all_downloaded = await ( # self.__generate_download_list(*parameters) # ) # if message_text.startswith("```"): # if all_downloaded: # self.bot.log("All torrents are downloaded") # else: # message_text = self.long_strings["No updates"].format( # message_text[:-3] # ) # self.bot.log("The message updated 20 times") # await old_message.edit(content = message_text)