Files
Gwendolyn/gwendolyn/funcs/better_netflix/better_netflix.py
2025-10-28 15:42:26 +01:00

305 lines
10 KiB
Python

from requests import get, post
from random import choice
import io
from math import floor
from time import time
from PIL import Image
from interactions import SlashContext, Embed, EmbedFooter, EmbedAttachment, File
from gwendolyn.exceptions import CannotConnectToService, ServiceNotProvided
class Service():
def __init__(self, ip: str, port: str, api_key: str) -> None:
self.ip = ip
self.port = port
self.header = {
"accept": "application/json",
"X-Api-Key": api_key
}
def test(self):
return get(f"http://{self.ip}:{self.port}/api", headers=self.header).ok
class Radarr(Service):
def movies(self):
response = get(f"http://{self.ip}:{self.port}/api/v3/movie", headers=self.header)
if not response.ok:
return []
return [m for m in response.json() if m["hasFile"]]
class Sonarr(Service):
def shows(self):
response = get(f"http://{self.ip}:{self.port}/api/v3/series", headers=self.header)
if not response.ok:
return []
return response.json()
class QBittorrent(Service):
def __init__(self, ip: str, port: str, username: str, password: str) -> None:
self.ip = ip
self.port = port
data = {"username": username,"password": password}
response = post(f"http://{ip}:{port}/api/v2/auth/login", data=data)
self.logged_in = response.ok
self.cookie = {"SID": response.cookies.values()[0]}
def test(self):
return self.logged_in
def get_torrents(self):
response = get(
f"http://{self.ip}:{self.port}/api/v2/torrents/info",
cookies=self.cookie
)
return response.json()
class TMDb():
def __init__(self, api_token: str) -> None:
self.header = {
"accept": "application/json",
"Authorization": "Bearer "+api_token
}
def test(self):
return get(f"https://api.themoviedb.org/3/account", headers=self.header).ok
class BetterNetflix():
def __init__(self, bot, **kwargs) -> None:
self.bot = bot
services = ["radarr","sonarr","tmdb","qbittorrent"]
for service in services:
try:
setattr(self,service,kwargs[service])
except:
raise ServiceNotProvided(service)
if getattr(self,service).test():
self.bot.log(f"Connected to {service}")
else:
raise CannotConnectToService(service)
def _poster_color(self, poster_url: str):
response = get(poster_url, stream=True)
img = Image.open(io.BytesIO(response.content))
img.thumbnail((150, 100))
# Reduce colors (uses k-means internally)
paletted = img.convert('P', palette=Image.ADAPTIVE, colors=6)
buf = io.BytesIO()
img2 = paletted.convert("RGB")
img2.save("gwendolyn/resources/temp.jpg")
# Find the color that occurs most often
palette = paletted.getpalette()
color_counts = sorted(paletted.getcolors(), reverse=True)
palette_index = color_counts[0][1]
dominant_color = palette[palette_index*3:palette_index*3+3]
return dominant_color
async def movie(self, ctx: SlashContext):
msg = await ctx.send("Finding a random movie...")
movies = self.radarr.movies()
if len(movies) == 0:
await msg.edit(content="Unable to find any movies")
return
picked_movie = choice(movies)
description = ""
if "imdb" in picked_movie['ratings']:
description += f"<:imdb:1301506320603676782> {picked_movie['ratings']['imdb']['value']}"
description += "/10 "
if "rottenTomatoes" in picked_movie['ratings']:
rt_value = picked_movie['ratings']['rottenTomatoes']['value']
rt_icon = "<:fresh:1301509701338660894>" if rt_value >= 60 else "<:rotten:1301509685907820607>"
description += f"{rt_icon} {rt_value}%"
if description != "":
description += "\n\n"
year = EmbedFooter(str(picked_movie["year"])) if "year" in picked_movie else None
has_poster = len(picked_movie["images"]) > 0 and "remoteUrl" in picked_movie["images"][0]
poster = EmbedAttachment(picked_movie["images"][0]["remoteUrl"]) if has_poster else None
color = self._poster_color(picked_movie["images"][0]["remoteUrl"]) if has_poster else "#ff0000"
description += picked_movie["overview"]
embed = Embed(
picked_movie["title"],
description,
color,
thumbnail=poster,
footer=year
)
await msg.edit(content="",embed=embed)
async def show(self, ctx: SlashContext):
msg = await ctx.send("Finding a random show...")
shows = self.sonarr.shows()
if len(shows) == 0:
await msg.edit(content="Unable to find any shows")
return
picked_show = choice(shows)
description = ""
if "imdb" in picked_show['ratings']:
description += f"<:imdb:1301506320603676782> {picked_show['ratings']['imdb']['value']}"
description += "\u1CBC\u1CBC"
if "rottenTomatoes" in picked_show['ratings']:
rt_value = picked_show['ratings']['rottenTomatoes']['value']
rt_icon = "<:fresh:1301509701338660894>" if rt_value >= 60 else "<:rotten:1301509685907820607>"
description += f"{rt_icon} {rt_value}%"
if description != "":
description += "\n\n"
year = EmbedFooter(str(picked_show["year"])) if "year" in picked_show else None
images = {i["coverType"]:i for i in picked_show["images"]}
has_poster = "poster" in images and "remoteUrl" in images["poster"]
poster = EmbedAttachment(images["poster"]["remoteUrl"]) if has_poster else None
color = self._poster_color(images["poster"]["remoteUrl"]) if has_poster else "#ff0000"
description += picked_show["overview"]
embed = Embed(
picked_show["title"],
description,
color,
thumbnail=poster,
footer=year
)
await msg.edit(content="",embed=embed)
def _draw_torrent_list(self, title_width: int):
def ratio_to_bar(ratio):
progress_bar = "|"+(""*floor(download_ratio*20))
while len(progress_bar) < 21:
progress_bar += " "
progress_bar += "| "+str(floor(download_ratio*100))+"%"
while len(progress_bar) < 27:
progress_bar += " "
return progress_bar
message = []
all_downloaded = True
dm_section_title = "*Torrent Downloads*"
dm_section_title_line = "-"*((title_width-len(dm_section_title))//2)
message.append(
dm_section_title_line+dm_section_title+dm_section_title_line
)
torrent_list = self.qbittorrent.get_torrents()
if len(torrent_list) == 0:
message.append("No torrents currently downloading")
return message, all_downloaded
for torrent in torrent_list:
if torrent['category'] not in ["radarr", "tv-sonarr"]:
break
torrent_name = torrent["name"]
if len(torrent_name) > 30:
if torrent_name[26] == " ":
torrent_name = torrent_name[:26]+"...."
else:
torrent_name = torrent_name[:27]+"..."
while len(torrent_name) < 30:
torrent_name += " "
if torrent["size"] == 0:
download_ratio = 0
elif torrent["amount_left"] == 0:
download_ratio = 1
else:
download_ratio = min(
torrent["downloaded"]/torrent["size"],
1
)
progress_bar = ratio_to_bar(download_ratio)
eta_in_seconds = torrent["eta"]
if eta_in_seconds >= 8640000:
eta = ""
else:
eta = ""
if eta_in_seconds >= 86400:
eta += str(floor(eta_in_seconds/86400))+"d "
if eta_in_seconds >= 3600:
eta += str(floor((eta_in_seconds%86400)/3600))+"h "
if eta_in_seconds >= 60:
eta += str(floor((eta_in_seconds%3600)/60))+"m "
eta += str(eta_in_seconds%60)+"s"
torrent_info = f"{torrent_name} {progress_bar} "
torrent_info += f"(Eta: {eta})"
if torrent["state"] == "stalledDL":
torrent_info += " (Stalled)"
if not (download_ratio == 1 and
torrent["last_activity"] < time()-7200):
message.append(torrent_info)
return message
async def _generate_download_list(self):
"""Generate a list of all torrents.
*Returns*
message_text: str
A formatted list of all torrents
"""
self.bot.log("Generating torrent list")
title_width = 90
message = self._draw_torrent_list(title_width)
message_text = "```"+"\n".join(message)+"```"
if message_text == "``````":
message_text = self.long_strings["No torrents downloading"]
return message_text
async def downloading(self, ctx: SlashContext):
"""Send message with list of all downloading torrents."""
async def send_long_message(ctx,message_text):
if len(message_text) <= 1994:
await ctx.send("```"+message_text+"```")
else:
cut_off_index = message_text[:1994].rfind("\n")
await ctx.send("```"+message_text[:cut_off_index]+"```")
await send_long_message(ctx,message_text[cut_off_index+1:])
ctx.defer()
message_text = await self._generate_download_list()
if not message_text.startswith("```"):
await ctx.send(message_text)
elif len(message_text) > 2000:
message_text = message_text[3:-3]
await send_long_message(ctx,message_text)
else:
await ctx.send(message_text)