261 lines
8.4 KiB
Python
261 lines
8.4 KiB
Python
"""
|
|
Contains classes used for utilities.
|
|
|
|
*Classes*
|
|
---------
|
|
DatabaseFuncs()
|
|
"""
|
|
import os # Used to test if files exist
|
|
import json # Used to read the data about add_movie/add_show
|
|
import time # Used to test how long it's been since commands were synced
|
|
|
|
import re # Used in get_id
|
|
import discord # Used for type hints
|
|
|
|
|
|
class DatabaseFuncs():
|
|
"""
|
|
Manages database functions.
|
|
|
|
*Methods*
|
|
---------
|
|
get_name(user_id: str) -> str
|
|
get_id(user_name: str) -> str
|
|
delete_game(game_type: str, channel: str)
|
|
wipe_games()
|
|
connect_four_reaction_test(message: discord.Message,
|
|
user: discord.User) -> bool
|
|
hangman_reaction_test(message: discord.Message,
|
|
user: discord.User) -> bool
|
|
plex_reaction_test(message: discord.Message,
|
|
user: discord.User) -> bool, bool,
|
|
list
|
|
imdb_commands()
|
|
"""
|
|
|
|
def __init__(self, bot):
|
|
"""Initialize the class."""
|
|
self.bot = bot
|
|
|
|
def get_name(self, user_id: str):
|
|
"""
|
|
Get the name of a user you have the # id of.
|
|
|
|
*Parameters:
|
|
------------
|
|
user_id: str
|
|
The id of the user you want the name of. The format is
|
|
"#" + str(discord.User.id)
|
|
|
|
*Returns*
|
|
---------
|
|
user_name: str
|
|
The name of the user. If the user couldn't be found,
|
|
returns the user_id.
|
|
"""
|
|
user = self.bot.database["users"].find_one({"_id": user_id})
|
|
|
|
if user_id == f"#{self.bot.user.id}":
|
|
return_name = "Gwendolyn"
|
|
elif user is not None:
|
|
return_name = user["user name"]
|
|
else:
|
|
self.bot.log(f"Couldn't find user {user_id}")
|
|
return_name = user_id
|
|
|
|
return return_name
|
|
|
|
def get_id(self, user_name: str):
|
|
"""
|
|
Get the id of a user you have the username of.
|
|
|
|
*Parameters:
|
|
------------
|
|
user_name: str
|
|
The name of the user you want the id of.
|
|
|
|
*Returns*
|
|
---------
|
|
user_id: str
|
|
The id of the user in the format "#" +
|
|
str(discord.User.id). If the user couldn't be found,
|
|
returns the user_name.
|
|
"""
|
|
user_search = {"user name": re.compile(user_name, re.IGNORECASE)}
|
|
user = self.bot.database["users"].find_one(user_search)
|
|
|
|
if user is not None:
|
|
return_id = user["_id"]
|
|
else:
|
|
self.bot.log("Couldn't find user "+user_name)
|
|
return_id = None
|
|
|
|
return return_id
|
|
|
|
def delete_game(self, game_type: str, channel: str):
|
|
"""
|
|
Remove a game from the database.
|
|
|
|
*Parameters*
|
|
------------
|
|
game_type: str
|
|
The name of the collection the game is in, like
|
|
"hangman games", "blackjack games" etc.
|
|
channel: str
|
|
The channel id of the channel the game is on as a
|
|
string.
|
|
"""
|
|
self.bot.database[game_type].delete_one({"_id": channel})
|
|
|
|
def wipe_games(self):
|
|
"""Delete all running games and pull from git."""
|
|
game_types = [
|
|
"trivia questions",
|
|
"blackjack games",
|
|
"connect 4 games",
|
|
"hangman games",
|
|
"hex games"
|
|
]
|
|
for game_type in game_types:
|
|
self.bot.database[game_type].delete_many({})
|
|
|
|
def connect_four_reaction_test(self, message: discord.Message,
|
|
user: discord.User):
|
|
"""
|
|
Test if the given message is the current connect four game.
|
|
|
|
Also tests if the given user is the one who's turn it is.
|
|
|
|
*Parameters*
|
|
------------
|
|
message: discord.Message
|
|
The message to test.
|
|
user: discord.User
|
|
The user to test.
|
|
*Returns*
|
|
---------
|
|
: bool
|
|
Whether the given message is the current connect four
|
|
game and if the user who reacted is the user who's turn
|
|
it is.
|
|
"""
|
|
channel = message.channel
|
|
channel_search = {"_id": str(channel.id)}
|
|
game = self.bot.database["connect 4 games"].find_one(channel_search)
|
|
|
|
old_images_path = "gwendolyn/resources/games/old_images/"
|
|
file_path = old_images_path + f"connect_four{channel.id}"
|
|
if os.path.isfile(file_path):
|
|
with open(file_path, "r") as file_pointer:
|
|
old_image = int(file_pointer.read())
|
|
else:
|
|
old_image = 0
|
|
|
|
if message.id == old_image:
|
|
self.bot.log("They reacted to the connect_four game")
|
|
turn = game["turn"]
|
|
if user == game["players"][turn]:
|
|
valid_reaction = True
|
|
else:
|
|
self.bot.log("It wasn't their turn")
|
|
valid_reaction = False
|
|
else:
|
|
valid_reaction = False
|
|
|
|
return valid_reaction
|
|
|
|
def hangman_reaction_test(self, message: discord.Message,
|
|
user: discord.User):
|
|
"""
|
|
Test if the given message is the current hangman game.
|
|
|
|
Also tests if the given user is the one who's playing hangman.
|
|
|
|
*Parameters*
|
|
------------
|
|
message: discord.Message
|
|
The message to test.
|
|
user: discord.User
|
|
The user to test.
|
|
*Returns*
|
|
---------
|
|
: bool
|
|
Whether the given message is the current hangman game
|
|
and if the user who reacted is the user who's playing
|
|
hangman.
|
|
"""
|
|
channel = message.channel
|
|
file_path = f"gwendolyn/resources/games/old_images/hangman{channel.id}"
|
|
if os.path.isfile(file_path):
|
|
with open(file_path, "r") as file_pointer:
|
|
old_messages = file_pointer.read().splitlines()
|
|
else:
|
|
return False
|
|
game_message = False
|
|
|
|
for old_message in old_messages:
|
|
old_message_id = int(old_message)
|
|
if message.id == old_message_id:
|
|
database = self.bot.database["hangman games"]
|
|
channel_search = {"_id": str(channel.id)}
|
|
game = database.find_one(channel_search)
|
|
if user == game["player"]:
|
|
game_message = True
|
|
|
|
break
|
|
|
|
return game_message
|
|
|
|
def plex_reaction_test(self, message: discord.Message):
|
|
"""
|
|
Test if the given message is the response to a plex request.
|
|
|
|
*Parameters*
|
|
------------
|
|
message: discord.Message
|
|
The message to test.
|
|
|
|
*Returns*
|
|
---------
|
|
: bool
|
|
Whether the message is the response to a plex request.
|
|
: bool
|
|
Whether it was a movie request (false for a show
|
|
request)
|
|
: list
|
|
A list of ids or names of the shows or movies that
|
|
Gwendolyn presented after the request.
|
|
"""
|
|
channel = message.channel
|
|
old_messages_path = "gwendolyn/resources/plex/"
|
|
file_path = old_messages_path + f"old_message{str(channel.id)}"
|
|
if os.path.isfile(file_path):
|
|
with open(file_path, "r") as file_pointer:
|
|
data = json.load(file_pointer)
|
|
else:
|
|
return (False, None, None)
|
|
|
|
if data["message_id"] != message.id:
|
|
return (False, None, None)
|
|
|
|
if "imdb_ids" in data:
|
|
return_data = (True, True, data["imdb_ids"])
|
|
else:
|
|
return_data = (True, False, data["imdb_names"])
|
|
|
|
return return_data
|
|
|
|
async def imdb_commands(self):
|
|
"""Sync the slash commands with the discord API."""
|
|
collection = self.bot.database["last synced"]
|
|
last_synced = collection.find_one()
|
|
now = time.time()
|
|
if last_synced["last synced"] < now - 86400:
|
|
slash_command_list = await self.bot.slash.to_dict()
|
|
self.bot.log(f"Updating commands: {slash_command_list}")
|
|
await self.bot.slash.sync_all_commands()
|
|
id_number = last_synced["_id"]
|
|
query_filter = {"_id": id_number}
|
|
update = {"$set": {"last synced": now}}
|
|
collection.update_one(query_filter, update)
|