PEP in utils

This commit is contained in:
Nikolaj
2021-06-16 14:12:21 +02:00
parent 8c253aca3d
commit b68d62cf6c
137 changed files with 1159 additions and 1251 deletions

View File

@ -0,0 +1,11 @@
"""A collections of utilities used by Gwendolyn and her functions."""
__all__ = ["get_options", "get_credentials", "DatabaseFuncs", "EventHandler",
"ErrorHandler", "get_params", "log_this", "cap", "make_files",
"replace_multiple", "emoji_to_command"]
from .helper_classes import DatabaseFuncs
from .event_handlers import EventHandler, ErrorHandler
from .util_functions import (get_params, log_this, cap, make_files,
replace_multiple, emoji_to_command, long_strings,
sanitize, get_options, get_credentials)

View File

@ -0,0 +1,177 @@
"""
Classes used to handle bot events and errors.
*Classes*
---------
EventHandler
ErrorHandler
"""
import traceback # Used to get the traceback of errors
import sys # Used to get traceback when the specific error is not
# available
import discord # Used to init discord.Game and discord.Status, as well
# as compare errors to discord errors and as typehints
from discord.ext import commands # Used to compare errors with command
# errors
from discord_slash.context import SlashContext
from gwendolyn.utils.util_functions import emoji_to_command
class EventHandler():
"""
Handles bot events.
*Methods*
---------
on_ready()
on_slash_command(ctx: discord_slash.context.SlashContext)
on_reaction_add(ctx: discord_slash.context.SlashContext)
"""
def __init__(self, bot):
"""Initialize the handler."""
self.bot = bot
async def on_ready(self):
"""Log and sets status when it logs in."""
await self.bot.database_funcs.imdb_commands()
name = self.bot.user.name
userid = str(self.bot.user.id)
logged_in_message = f"Logged in as {name}, {userid}"
self.bot.log(logged_in_message, level=25)
game = discord.Game("Use /help for commands")
online_status = discord.Status.online
await self.bot.change_presence(activity=game, status=online_status)
async def on_slash_command(self, ctx: SlashContext):
"""Log when a slash command is given."""
if ctx.subcommand_name is not None:
subcommand = f" {ctx.subcommand_name} "
else:
subcommand = " "
if ctx.subcommand_group is not None:
sub_command_group = f"{ctx.subcommand_group} "
else:
sub_command_group = ""
args = " ".join([str(i) for i in ctx.args])
full_command = f"/{ctx.command}{subcommand}{sub_command_group}{args}"
log_message = f"{ctx.author.display_name} ran {full_command}"
self.bot.log(log_message, str(ctx.channel_id), level=25)
async def on_reaction_add(self, reaction: discord.Reaction,
user: discord.User):
"""Take action if the reaction is on a command message."""
if not user.bot:
tests = self.bot.database_funcs
message = reaction.message
channel = message.channel
reacted_message = f"{user.display_name} reacted to a message"
self.bot.log(reacted_message, str(channel.id))
plex_data = tests.bedre_netflix_reaction_test(message)
# plex_data is a list containing 3 elements: whether it was
# the add_show/add_movie command message the reaction was to
# (bool), whether it's a movie (bool) (if false, it's a
# show), and the imdb ids/names for the for the movies or
# shows listed in the message (list).
reaction_test_parameters = [message, f"#{str(user.id)}"]
if tests.connect_four_reaction_test(*reaction_test_parameters):
column = emoji_to_command(reaction.emoji)
params = [message, f"#{user.id}", column-1]
await self.bot.games.connect_four.placePiece(*params)
if plex_data[0]:
plex_functions = self.bot.other.bedre_netflix
if plex_data[1]:
movie_pick = emoji_to_command(reaction.emoji)
if movie_pick == "none":
imdb_id = None
else:
imdb_id = plex_data[2][movie_pick-1]
if isinstance(channel, discord.DMChannel):
await message.delete()
await plex_functions.add_movie(message, imdb_id, False)
else:
await message.clear_reactions()
await plex_functions.add_movie(message, imdb_id)
else:
show_pick = emoji_to_command(reaction.emoji)
if show_pick == "none":
imdb_name = None
else:
imdb_name = plex_data[2][show_pick-1]
if isinstance(channel, discord.DMChannel):
await message.delete()
await plex_functions.add_show(message, imdb_name, False)
else:
await message.clear_reactions()
await plex_functions.add_show(message, imdb_name)
elif tests.hangman_reaction_test(*reaction_test_parameters):
self.bot.log("They reacted to the hangman message")
if ord(reaction.emoji) in range(127462, 127488):
# The range is letter-emojis
guess = chr(ord(reaction.emoji)-127397)
# Converts emoji to letter
params = [message, f"#{user.id}", guess]
await self.bot.games.hangman.guess(*params)
else:
self.bot.log("Bot they didn't react with a valid guess")
class ErrorHandler():
"""
Handles errors.
*Methods*
---------
on_slash_command_error(ctx: discord_slash.context.SlashContext,
error: Exception)
on_error(method: str)
"""
def __init__(self, bot):
"""Initialize the handler."""
self.bot = bot
async def on_slash_command_error(self, ctx: SlashContext,
error: Exception):
"""Log when there's a slash command."""
if isinstance(error, commands.CommandNotFound):
await ctx.send("That's not a command")
elif isinstance(error, discord.errors.NotFound):
self.bot.log("Deleted message before I could add all reactions")
elif isinstance(error, commands.errors.MissingRequiredArgument):
self.bot.log(f"{error}", str(ctx.channel_id))
await ctx.send(self.bot.long_strings["missing parameters"])
else:
params = [type(error), error, error.__traceback__]
exception = traceback.format_exception(*params)
exception_string = "".join(exception)
log_messages = [f"exception in /{ctx.name}", f"{exception_string}"]
self.bot.log(log_messages, str(ctx.channel_id), 40)
if isinstance(error, discord.errors.NotFound):
self.bot.log("Context is non-existant", level=40)
else:
await ctx.send("Something went wrong (error code 000)")
async def on_error(self, method: str):
"""Log when there's an error."""
error_type = sys.exc_info()[0]
if error_type == discord.errors.NotFound:
self.bot.log("Deleted message before I could add all reactions")
else:
exception = traceback.format_exc()
exception_string = "".join(exception)
log_messages = [f"exception in {method}", f"{exception_string}"]
self.bot.log(log_messages, level=40)

View File

@ -0,0 +1,260 @@
"""
Contains classes used for utilities.
*Classes*
---------
DatabaseFuncs()
"""
import os # Used to test if files exist
import json # Used to read the data about add_movie/add_show
import time # Used to test how long it's been since commands were synced
import re # Used in get_id
import discord # Used for type hints
class DatabaseFuncs():
"""
Manages database functions.
*Methods*
---------
get_name(user_id: str) -> str
get_id(user_name: str) -> str
delete_game(game_type: str, channel: str)
wipe_games()
connect_four_reaction_test(message: discord.Message,
user: discord.User) -> bool
hangman_reaction_test(message: discord.Message,
user: discord.User) -> bool
bedre_netflix_reaction_test(message: discord.Message,
user: discord.User) -> bool, bool,
list
imdb_commands()
"""
def __init__(self, bot):
"""Initialize the class."""
self.bot = bot
def get_name(self, user_id: str):
"""
Get the name of a user you have the # id of.
*Parameters:
------------
user_id: str
The id of the user you want the name of. The format is
"#" + str(discord.User.id)
*Returns*
---------
user_name: str
The name of the user. If the user couldn't be found,
returns the user_id.
"""
user = self.bot.database["users"].find_one({"_id": user_id})
if user_id == f"#{self.bot.user.id}":
return_name = "Gwendolyn"
elif user is not None:
return_name = user["user name"]
else:
self.bot.log(f"Couldn't find user {user_id}")
return_name = user_id
return return_name
def get_id(self, user_name: str):
"""
Get the id of a user you have the username of.
*Parameters:
------------
user_name: str
The name of the user you want the id of.
*Returns*
---------
user_id: str
The id of the user in the format "#" +
str(discord.User.id). If the user couldn't be found,
returns the user_name.
"""
user_search = {"user name": re.compile(user_name, re.IGNORECASE)}
user = self.bot.database["users"].find_one(user_search)
if user is not None:
return_id = user["_id"]
else:
self.bot.log("Couldn't find user "+user_name)
return_id = None
return return_id
def delete_game(self, game_type: str, channel: str):
"""
Remove a game from the database.
*Parameters*
------------
game_type: str
The name of the collection the game is in, like
"hangman games", "blackjack games" etc.
channel: str
The channel id of the channel the game is on as a
string.
"""
self.bot.database[game_type].delete_one({"_id": channel})
def wipe_games(self):
"""Delete all running games and pull from git."""
game_types = [
"trivia questions",
"blackjack games",
"connect 4 games",
"hangman games",
"hex games"
]
for game_type in game_types:
self.bot.database[game_type].delete_many({})
def connect_four_reaction_test(self, message: discord.Message,
user: discord.User):
"""
Test if the given message is the current connect four game.
Also tests if the given user is the one who's turn it is.
*Parameters*
------------
message: discord.Message
The message to test.
user: discord.User
The user to test.
*Returns*
---------
: bool
Whether the given message is the current connect four
game and if the user who reacted is the user who's turn
it is.
"""
channel = message.channel
channel_search = {"_id": str(channel.id)}
game = self.bot.database["connect 4 games"].find_one(channel_search)
old_images_path = "gwendolyn/resources/games/old_images/"
file_path = old_images_path + f"connect_four{channel.id}"
if os.path.isfile(file_path):
with open(file_path, "r") as file_pointer:
old_image = int(file_pointer.read())
else:
old_image = 0
if message.id == old_image:
self.bot.log("They reacted to the connect_four game")
turn = game["turn"]
if user == game["players"][turn]:
valid_reaction = True
else:
self.bot.log("It wasn't their turn")
valid_reaction = False
else:
valid_reaction = False
return valid_reaction
def hangman_reaction_test(self, message: discord.Message,
user: discord.User):
"""
Test if the given message is the current hangman game.
Also tests if the given user is the one who's playing hangman.
*Parameters*
------------
message: discord.Message
The message to test.
user: discord.User
The user to test.
*Returns*
---------
: bool
Whether the given message is the current hangman game
and if the user who reacted is the user who's playing
hangman.
"""
channel = message.channel
file_path = f"gwendolyn/resources/games/old_images/hangman{channel.id}"
if os.path.isfile(file_path):
with open(file_path, "r") as file_pointer:
old_messages = file_pointer.read().splitlines()
else:
return False
game_message = False
for old_message in old_messages:
old_message_id = int(old_message)
if message.id == old_message_id:
database = self.bot.database["hangman games"]
channel_search = {"_id": str(channel.id)}
game = database.find_one(channel_search)
if user == game["player"]:
game_message = True
break
return game_message
def bedre_netflix_reaction_test(self, message: discord.Message):
"""
Test if the given message is the response to a plex request.
*Parameters*
------------
message: discord.Message
The message to test.
*Returns*
---------
: bool
Whether the message is the response to a plex request.
: bool
Whether it was a movie request (false for a show
request)
: list
A list of ids or names of the shows or movies that
Gwendolyn presented after the request.
"""
channel = message.channel
old_messages_path = "gwendolyn/resources/bedre_netflix/"
file_path = old_messages_path + f"old_message{str(channel.id)}"
if os.path.isfile(file_path):
with open(file_path, "r") as file_pointer:
data = json.load(file_pointer)
else:
return (False, None, None)
if data["message_id"] != message.id:
return (False, None, None)
if "imdb_ids" in data:
return_data = (True, True, data["imdb_ids"])
else:
return_data = (True, False, data["imdb_names"])
return return_data
async def imdb_commands(self):
"""Sync the slash commands with the discord API."""
collection = self.bot.database["last synced"]
last_synced = collection.find_one()
now = time.time()
if last_synced["last synced"] < now - 86400:
slash_command_list = await self.bot.slash.to_dict()
self.bot.log(f"Updating commands: {slash_command_list}")
await self.bot.slash.sync_all_commands()
id_number = last_synced["_id"]
query_filter = {"_id": id_number}
update = {"$set": {"last synced": now}}
collection.update_one(query_filter, update)

View File

@ -0,0 +1,340 @@
"""
Contains utility functions used by parts of the bot.
*Functions*
-----------
sanitize(data: str, lower_case_value: bool = false) -> dict
get_options() -> dict
get_credentials() -> dict
long_strings() -> dict
get_params() -> dict
log_this(messages: Union[str, list], channel: str = "",
level: int = 20)
cap(s: str) -> str
make_files()
replace_multiple(main_string: str, to_be_replaced: list,
new_string: str) -> str
emoji_to_command(emoji: str) -> str
"""
import json # Used by longString(), get_params() and make_files()
import logging # Used for logging
import os # Used by make_files() to check if files exist
import sys # Used to specify printing for logging
import imdb # Used to disable logging for the module
# All of this is logging configuration
FORMAT = " %(asctime)s | %(name)-16s | %(levelname)-8s | %(message)s"
PRINTFORMAT = "%(asctime)s - %(message)s"
DATEFORMAT = "%Y-%m-%d %H:%M:%S"
logging.addLevelName(25, "PRINT")
loggingConfigParams = {
"format": FORMAT,
"datefmt": DATEFORMAT,
"level": logging.INFO,
"filename": "gwendolyn.log"
}
logging.basicConfig(**loggingConfigParams)
logger = logging.getLogger("Gwendolyn")
printer = logging.getLogger("printer")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt=PRINTFORMAT, datefmt=DATEFORMAT))
printer.addHandler(handler)
printer.propagate = False
imdb._logging.setLevel("CRITICAL") # pylint: disable=protected-access
# Basically disables imdbpy logging, since it's being printed to the
# terminal.
def sanitize(data: str, lower_case_value: bool = False):
"""
Sanitize and create a dictionary from a string.
Each element is created from a line with a : in it. The key is left
of the :, the value is right of it.
*Parameters*
------------
data: str
The string to create a dict from.
lower_case_value: bool = False
Whether the value of each element should be lowercase.
*Returns*
---------
dct: dict
The sanitized dictionary of elements.
"""
data = data.splitlines()
dct = {}
for line in data:
if line[0] != "#" and ":" in line:
line_values = line.split(":")
line_values[0] = line_values[0].lower()
line_values[1] = line_values[1].replace(" ", "")
if lower_case_value:
line_values[1] = line_values[1].lower()
if line_values[0] in ["testing guild ids", "admins"]:
line_values[1] = line_values[1].split(",")
if all(i.isnumeric() for i in line_values[1]):
line_values[1] = [int(i) for i in line_values[1]]
if any(i == line_values[1] for i in ["true", "false"]):
line_values[1] = (line_values[1] == "true")
dct[line_values[0]] = line_values[1]
return dct
def get_options():
"""
Get the bot options as dict.
*Returns*
---------
options: dict
The options of the bot.
"""
with open("options.txt", "r") as file_pointer:
data = sanitize(file_pointer.read(), True)
options = {}
options["testing"] = data["testing"]
options["guild_ids"] = data["testing guild ids"]
options["admins"] = data["admins"]
return options
def get_credentials():
"""
Returns the credentials used by the bot as a dict.
*Returns*
---------
credentials: dict
The credentials used by the bot.
"""
with open("credentials.txt", "r") as file_pointer:
data = sanitize(file_pointer.read())
credentials = {}
credentials["token"] = data["bot token"]
credentials["finnhub_key"] = data["finnhub api key"]
credentials["wordnik_key"] = data["wordnik api key"]
credentials["mongo_db_user"] = data["mongodb user"]
credentials["mongo_db_password"] = data["mongodb password"]
credentials["wolfram_alpha_key"] = data["wolframalpha appid"]
credentials["radarr_key"] = data["radarr api key"]
credentials["sonarr_key"] = data["sonarr api key"]
return credentials
def long_strings():
"""
Get the data from gwendolyn/resources/long_strings.json.
*Returns*
---------
data: dict
The long strings and their keys.
"""
with open("gwendolyn/resources/long_strings.json", "r") as file_pointer:
data = json.load(file_pointer)
return data
def get_params():
"""
Get the slash command parameters.
*Returns*
---------
params: dict
The parameters for every slash command.
"""
with open("gwendolyn/resources/slash_parameters.json", "r") as file_pointer:
slash_parameters = json.load(file_pointer)
options = get_options()
if options["testing"]:
for parameter in slash_parameters:
slash_parameters[parameter]["guild_ids"] = options["guild_ids"]
return slash_parameters
def log_this(messages, channel: str = "", level: int = 20):
"""
Log something in Gwendolyn's logs.
*Parameters*
------------
messages: Union[str, list]
A string or list of strings to be logged. If there are
multiple strings and the level is PRINT (25) or higher,
only the first string will be printed.
channel: str = ""
The channel the event to be logged occurred in. Will be
logged along with the message(s).
level: int = 20
The level to log the message(s) at. If PRINT (25) or
higher, the first message will be printed to the console.
"""
channel = channel.replace("Direct Message with ", "")
if isinstance(messages, str):
messages = [messages]
print_message = messages[0]
for i, message in enumerate(messages):
if channel != "":
messages[i] = f"{message} - ({channel})" # Adds channel ID
# to log messages
if len(messages) > 1: # Tells user to check the log if there are
# more messages there
print_message += " (details in log)"
if level >= 25:
printer.log(level, print_message)
for log_message in messages:
logger.log(level, log_message)
def cap(input_string: str):
"""
Capitalize a string like a movie title.
That means "of" and "the" are not capitalized.
*Parameters*
------------
input_string: str
The string to capitalized.
*Returns*
---------
return_string: str
The capitalized string.
"""
no_caps_list = ["of", "the"]
word_number = 0
string_list = input_string.split()
return_string = ''
for word in string_list:
word_number += 1
if word not in no_caps_list or word_number == 1:
word = word.capitalize()
return_string += word+" "
return_string = return_string[:-1]
return return_string
def make_files():
"""Create all the files and directories needed by Gwendolyn."""
def make_json_file(path, content):
"""Create json file if it doesn't exist."""
if not os.path.isfile(path):
log_this(path.split("/")[-1]+" didn't exist. Making it now.")
with open(path, "w") as file_pointer:
json.dump(content, file_pointer, indent=4)
def make_txt_file(path, content):
"""Create txt file if it doesn't exist."""
if not os.path.isfile(path):
log_this(path.split("/")[-1]+" didn't exist. Making it now.")
with open(path, "w") as file_pointer:
file_pointer.write(content)
def directory(path):
"""Create directory if it doesn't exist."""
if not os.path.isdir(path):
os.makedirs(path)
log_this("The "+path.split("/")[-1]+" directory didn't exist")
with open("gwendolyn/resources/starting_files.json") as file_pointer:
data = json.load(file_pointer)
for path, content in data["json"].items():
make_json_file(path, content)
for path, content in data["txt"].items():
make_txt_file(path, content)
for path in data["folder"]:
directory(path)
def replace_multiple(main_string: str, to_be_replaced: list, new_string: str):
"""
Replace multiple substrings in a string with the same substring.
*Parameters*
------------
main_string: str
The string to replace substrings in.
to_be_replaced: list
The substrings to replace.
new_string: str
The string to replace the substrings with.
*Returns*
---------
main_string: str
The string with the substrings replaced.
"""
# Iterate over the strings to be replaced
for elem in to_be_replaced:
# Check if string is in the main string
if elem in main_string:
# Replace the string
main_string = main_string.replace(elem, new_string)
return main_string
def emoji_to_command(emoji: str):
"""
Convert emoji to text.
*Parameters*
------------
emoji: str
The emoji to decipher.
*Returns*
---------
: str
The deciphered string.
"""
if emoji == "1":
return_value = 1
elif emoji == "2":
return_value = 2
elif emoji == "3":
return_value = 3
elif emoji == "4":
return_value = 4
elif emoji == "5":
return_value = 5
elif emoji == "6":
return_value = 6
elif emoji == "7":
return_value = 7
elif emoji == "🎲":
return_value = "roll"
elif emoji == "":
return_value = "none"
elif emoji == "✔️":
return_value = 1
else:
return_value = ""
return return_value