diff --git a/conductors/local_python_conductor.py b/conductors/local_python_conductor.py index c0175e6..dffecd7 100644 --- a/conductors/local_python_conductor.py +++ b/conductors/local_python_conductor.py @@ -16,9 +16,8 @@ from core.correctness.vars import JOB_TYPE_PYTHON, PYTHON_FUNC, JOB_STATUS, \ STATUS_DONE, JOB_END_TIME, STATUS_FAILED, JOB_ERROR, \ JOB_TYPE, JOB_TYPE_PAPERMILL, DEFAULT_JOB_QUEUE_DIR, DEFAULT_JOB_OUTPUT_DIR from core.correctness.validation import valid_job, valid_dir_path -from core.functionality import read_yaml, write_yaml, make_dir, write_file from core.meow import BaseConductor - +from functionality.file_io import make_dir, read_yaml, write_file, write_yaml class LocalPythonConductor(BaseConductor): def __init__(self, job_queue_dir:str=DEFAULT_JOB_QUEUE_DIR, diff --git a/core/correctness/__init__.py b/core/correctness/__init__.py index 979c633..e69de29 100644 --- a/core/correctness/__init__.py +++ b/core/correctness/__init__.py @@ -1,3 +0,0 @@ - -from core.correctness.validation import * -from core.correctness.vars import * \ No newline at end of file diff --git a/core/correctness/validation.py b/core/correctness/validation.py index c7567e1..4a41bf9 100644 --- a/core/correctness/validation.py +++ b/core/correctness/validation.py @@ -8,7 +8,7 @@ Author(s): David Marchant from datetime import datetime from inspect import signature from os.path import sep, exists, isfile, isdir, dirname -from typing import Any, _SpecialForm, Union, Tuple, Type, Dict, List, \ +from typing import Any, _SpecialForm, Union, Type, Dict, List, \ get_origin, get_args from core.correctness.vars import VALID_PATH_CHARS, get_not_imp_msg, \ @@ -248,22 +248,6 @@ def valid_non_existing_path(variable:str, allow_base:bool=False): raise ValueError( f"Route to requested path '{variable}' does not exist.") -def setup_debugging(print:Any=None, logging:int=0)->Tuple[Any,int]: - """Create a place for debug messages to be sent. Always returns a place, - along with a logging level.""" - check_type(logging, int) - if print is None: - return None, 0 - else: - if not isinstance(print, object): - raise TypeError(f"Invalid print location provided") - writeable = getattr(print, "write", None) - if not writeable or not callable(writeable): - raise TypeError(f"Print object does not implement required " - "'write' function") - - return print, logging - def valid_meow_dict(meow_dict:Dict[str,Any], msg:str, keys:Dict[str,Type])->None: """Check given dictionary expresses a meow construct. This won't do much diff --git a/core/functionality.py b/core/functionality.py deleted file mode 100644 index 79ac46e..0000000 --- a/core/functionality.py +++ /dev/null @@ -1,402 +0,0 @@ -# TODO comments -import copy -import hashlib -import json -import nbformat -import os -import yaml - -from datetime import datetime -from typing import List - -from multiprocessing.connection import Connection, wait as multi_wait -# Need to import additional Connection type for Windows machines -if os.name == 'nt': - from multiprocessing.connection import PipeConnection -from multiprocessing.queues import Queue -from papermill.translators import papermill_translators -from typing import Any, Dict -from random import SystemRandom - -from core.correctness.validation import check_type, valid_existing_file_path, \ - valid_path, check_script -from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ - VALID_CHANNELS, HASH_BUFFER_SIZE, SHA256, DEBUG_WARNING, DEBUG_INFO, \ - EVENT_TYPE, EVENT_PATH, JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, \ - JOB_RECIPE, JOB_RULE, EVENT_RULE, JOB_STATUS, STATUS_QUEUED, \ - JOB_CREATE_TIME, JOB_REQUIREMENTS, WATCHDOG_BASE, WATCHDOG_HASH, \ - EVENT_TYPE_WATCHDOG - -# mig trigger keyword replacements -KEYWORD_PATH = "{PATH}" -KEYWORD_REL_PATH = "{REL_PATH}" -KEYWORD_DIR = "{DIR}" -KEYWORD_REL_DIR = "{REL_DIR}" -KEYWORD_FILENAME = "{FILENAME}" -KEYWORD_PREFIX = "{PREFIX}" -KEYWORD_BASE = "{VGRID}" -KEYWORD_EXTENSION = "{EXTENSION}" -KEYWORD_JOB = "{JOB}" - - -#TODO Make this guaranteed unique -def generate_id(prefix:str="", length:int=16, existing_ids:List[str]=[], - charset:str=CHAR_UPPERCASE+CHAR_LOWERCASE, attempts:int=24): - random_length = max(length - len(prefix), 0) - for _ in range(attempts): - id = prefix + ''.join(SystemRandom().choice(charset) - for _ in range(random_length)) - if id not in existing_ids: - return id - raise ValueError(f"Could not generate ID unique from '{existing_ids}' " - f"using values '{charset}' and length of '{length}'.") - -def wait(inputs:List[VALID_CHANNELS])->List[VALID_CHANNELS]: - if os.name == 'nt': - return wait_windows(inputs) - return wait_linux(inputs) - -def wait_windows(inputs:List[VALID_CHANNELS])->List[VALID_CHANNELS]: - all_connections = [i for i in inputs if type(i) is Connection] \ - + [i for i in inputs if type(i) is PipeConnection] \ - + [i._reader for i in inputs if type(i) is Queue] - ready = multi_wait(all_connections) - ready_inputs = [i for i in inputs if \ - (type(i) is Connection and i in ready) \ - or (type(i) is PipeConnection and i in ready) \ - or (type(i) is Queue and i._reader in ready)] - return ready_inputs - -def wait_linux(inputs:List[VALID_CHANNELS])->List[VALID_CHANNELS]: - all_connections = [i for i in inputs if type(i) is Connection] \ - + [i._reader for i in inputs if type(i) is Queue] - ready = multi_wait(all_connections) - ready_inputs = [i for i in inputs if \ - (type(i) is Connection and i in ready) \ - or (type(i) is Queue and i._reader in ready)] - return ready_inputs - -def _get_file_sha256(file_path): - sha256_hash = hashlib.sha256() - - with open(file_path, 'rb') as file_to_hash: - while True: - buffer = file_to_hash.read(HASH_BUFFER_SIZE) - if not buffer: - break - sha256_hash.update(buffer) - - return sha256_hash.hexdigest() - -def get_file_hash(file_path:str, hash:str, hint:str=""): - check_type(hash, str, hint=hint) - - valid_existing_file_path(file_path) - - valid_hashes = { - SHA256: _get_file_sha256 - } - if hash not in valid_hashes: - raise KeyError(f"Cannot use hash '{hash}'. Valid are " - f"'{list(valid_hashes.keys())}") - - return valid_hashes[hash](file_path) - -def rmtree(directory:str): - """ - Remove a directory and all its contents. - Should be faster than shutil.rmtree - - :param: (str) The firectory to empty and remove - - :return: No return - """ - if not os.path.exists(directory): - return - for root, dirs, files in os.walk(directory, topdown=False): - for file in files: - os.remove(os.path.join(root, file)) - for dir in dirs: - rmtree(os.path.join(root, dir)) - os.rmdir(directory) - -def make_dir(path:str, can_exist:bool=True, ensure_clean:bool=False): - """ - Creates a new directory at the given path. - - :param path: (str) The directory path. - - :param can_exist: (boolean) [optional] A toggle for if a previously - existing directory at the path will throw an error or not. Default is - true (e.g. no error is thrown if the path already exists) - - :param ensure_clean: (boolean) [optional] A toggle for if a previously - existing directory at the path will be replaced with a new emtpy directory. - Default is False. - - :return: No return - """ - if os.path.exists(path): - if os.path.isfile(path): - raise ValueError( - f"Cannot make directory in {path} as it already exists and is " - "a file") - if ensure_clean: - rmtree(path) - - os.makedirs(path, exist_ok=can_exist) - -def read_file(filepath:str): - with open(filepath, 'r') as file: - return file.read() - -def read_file_lines(filepath:str): - with open(filepath, 'r') as file: - return file.readlines() - -def write_file(source:str, filename:str): - with open(filename, 'w') as file: - file.write(source) - -def read_yaml(filepath:str): - """ - Reads a file path as a yaml object. - - :param filepath: (str) The file to read. - - :return: (object) An object read from the file. - """ - with open(filepath, 'r') as yaml_file: - return yaml.load(yaml_file, Loader=yaml.Loader) - -def write_yaml(source:Any, filename:str): - """ - Writes a given objcet to a yaml file. - - :param source: (any) A python object to be written. - - :param filename: (str) The filename to be written to. - - :return: No return - """ - with open(filename, 'w') as param_file: - yaml.dump(source, param_file, default_flow_style=False) - -def read_notebook(filepath:str): - valid_path(filepath, extension="ipynb") - with open(filepath, 'r') as read_file: - return json.load(read_file) - -def write_notebook(source:Dict[str,Any], filename:str): - """ - Writes the given notebook source code to a given filename. - - :param source: (dict) The notebook source dictionary. - - :param filename: (str) The filename to write to. - - :return: No return - """ - with open(filename, 'w') as job_file: - json.dump(source, job_file) - -# Adapted from: https://github.com/rasmunk/notebook_parameterizer -def parameterize_jupyter_notebook(jupyter_notebook:Dict[str,Any], - parameters:Dict[str,Any], expand_env_values:bool=False)->Dict[str,Any]: - nbformat.validate(jupyter_notebook) - check_type(parameters, Dict, - hint="parameterize_jupyter_notebook.parameters") - - if jupyter_notebook["nbformat"] != 4: - raise Warning( - "Parameterization designed to work with nbformat version 4. " - f"Differing version of '{jupyter_notebook['nbformat']}' may " - "produce unexpeted results.") - - # Load input notebook - if "kernelspec" in jupyter_notebook["metadata"]: - kernel_name = jupyter_notebook["metadata"]["kernelspec"]["name"] - language = jupyter_notebook["metadata"]["kernelspec"]["language"] - if "language_info" in jupyter_notebook["metadata"]: - kernel_name = jupyter_notebook["metadata"]["language_info"]["name"] - language = jupyter_notebook["metadata"]["language_info"]["name"] - else: - raise AttributeError( - f"Notebook lacks key language and/or kernel_name attributes " - "within metadata") - - translator = papermill_translators.find_translator(kernel_name, language) - - output_notebook = copy.deepcopy(jupyter_notebook) - - # Find each - cells = output_notebook["cells"] - code_cells = [ - (idx, cell) for idx, cell in enumerate(cells) \ - if cell["cell_type"] == "code" - ] - for idx, cell in code_cells: - cell_updated = False - source = cell["source"] - # Either single string or a list of strings - if isinstance(source, str): - lines = source.split("\n") - else: - lines = source - - for idy, line in enumerate(lines): - if "=" in line: - d_line = list(map(lambda x: x.replace(" ", ""), - line.split("="))) - # Matching parameter name - if len(d_line) == 2 and d_line[0] in parameters: - value = parameters[d_line[0]] - # Whether to expand value from os env - if ( - expand_env_values - and isinstance(value, str) - and value.startswith("ENV_") - ): - env_var = value.replace("ENV_", "") - value = os.getenv( - env_var, - "MISSING ENVIRONMENT VARIABLE: {}".format(env_var) - ) - lines[idy] = translator.assign( - d_line[0], translator.translate(value) - ) - - cell_updated = True - if cell_updated: - cells[idx]["source"] = "\n".join(lines) - - # Validate that the parameterized notebook is still valid - nbformat.validate(output_notebook, version=4) - - return output_notebook - -def parameterize_python_script(script:List[str], parameters:Dict[str,Any], - expand_env_values:bool=False)->Dict[str,Any]: - check_script(script) - check_type(parameters, Dict - ,hint="parameterize_python_script.parameters") - - output_script = copy.deepcopy(script) - - for i, line in enumerate(output_script): - if "=" in line: - d_line = list(map(lambda x: x.replace(" ", ""), - line.split("="))) - # Matching parameter name - if len(d_line) == 2 and d_line[0] in parameters: - value = parameters[d_line[0]] - # Whether to expand value from os env - if ( - expand_env_values - and isinstance(value, str) - and value.startswith("ENV_") - ): - env_var = value.replace("ENV_", "") - value = os.getenv( - env_var, - "MISSING ENVIRONMENT VARIABLE: {}".format(env_var) - ) - output_script[i] = f"{d_line[0]} = {repr(value)}" - - # Validate that the parameterized notebook is still valid - check_script(output_script) - - return output_script - -def print_debug(print_target, debug_level, msg, level)->None: - """Function to print a message to the debug target, if its level exceeds - the given one.""" - if print_target is None: - return - else: - if level <= debug_level: - status = "ERROR" - if level == DEBUG_INFO: - status = "INFO" - elif level == DEBUG_WARNING: - status = "WARNING" - print(f"{status}: {msg}", file=print_target) - -def replace_keywords(old_dict:Dict[str,str], job_id:str, src_path:str, - monitor_base:str)->Dict[str,str]: - """Function to replace all MEOW magic words in a dictionary with dynamic - values.""" - new_dict = {} - - filename = os.path.basename(src_path) - dirname = os.path.dirname(src_path) - relpath = os.path.relpath(src_path, monitor_base) - reldirname = os.path.dirname(relpath) - (prefix, extension) = os.path.splitext(filename) - - for var, val in old_dict.items(): - if isinstance(val, str): - val = val.replace(KEYWORD_PATH, src_path) - val = val.replace(KEYWORD_REL_PATH, relpath) - val = val.replace(KEYWORD_DIR, dirname) - val = val.replace(KEYWORD_REL_DIR, reldirname) - val = val.replace(KEYWORD_FILENAME, filename) - val = val.replace(KEYWORD_PREFIX, prefix) - val = val.replace(KEYWORD_BASE, monitor_base) - val = val.replace(KEYWORD_EXTENSION, extension) - val = val.replace(KEYWORD_JOB, job_id) - - new_dict[var] = val - else: - new_dict[var] = val - - return new_dict - -def create_event(event_type:str, path:str, rule:Any, extras:Dict[Any,Any]={} - )->Dict[Any,Any]: - """Function to create a MEOW dictionary.""" - return { - **extras, - EVENT_PATH: path, - EVENT_TYPE: event_type, - EVENT_RULE: rule - } - -def create_watchdog_event(path:str, rule:Any, base:str, hash:str, - extras:Dict[Any,Any]={})->Dict[Any,Any]: - """Function to create a MEOW event dictionary.""" - return create_event( - EVENT_TYPE_WATCHDOG, - path, - rule, - extras={ - **extras, - **{ - WATCHDOG_HASH: hash, - WATCHDOG_BASE: base - } - } - ) - -def create_job(job_type:str, event:Dict[str,Any], extras:Dict[Any,Any]={} - )->Dict[Any,Any]: - """Function to create a MEOW job dictionary.""" - job_dict = { - #TODO compress event? - JOB_ID: generate_id(prefix="job_"), - JOB_EVENT: event, - JOB_TYPE: job_type, - JOB_PATTERN: event[EVENT_RULE].pattern.name, - JOB_RECIPE: event[EVENT_RULE].recipe.name, - JOB_RULE: event[EVENT_RULE].name, - JOB_STATUS: STATUS_QUEUED, - JOB_CREATE_TIME: datetime.now(), - JOB_REQUIREMENTS: event[EVENT_RULE].recipe.requirements - } - - return {**extras, **job_dict} - -def lines_to_string(lines:List[str])->str: - """Function to convert a list of str lines, into one continuous string - separated by newline characters""" - return "\n".join(lines) diff --git a/core/meow.py b/core/meow.py index 070a327..b311092 100644 --- a/core/meow.py +++ b/core/meow.py @@ -19,7 +19,7 @@ from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ SWEEP_JUMP, SWEEP_START, SWEEP_STOP, get_drt_imp_msg from core.correctness.validation import valid_string, check_type, \ check_implementation, valid_list, valid_dict -from core.functionality import generate_id +from functionality.naming import generate_id class BaseRecipe: diff --git a/core/runner.py b/core/runner.py index 8a71bc2..9ae48db 100644 --- a/core/runner.py +++ b/core/runner.py @@ -15,12 +15,12 @@ from random import randrange from typing import Any, Union, Dict, List from core.correctness.vars import DEBUG_WARNING, DEBUG_INFO, EVENT_TYPE, \ - VALID_CHANNELS, JOB_ID, META_FILE, DEFAULT_JOB_OUTPUT_DIR, \ - DEFAULT_JOB_QUEUE_DIR -from core.correctness.validation import setup_debugging, check_type, \ - valid_list, valid_dir_path -from core.functionality import print_debug, wait, read_yaml, make_dir + VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, DEFAULT_JOB_QUEUE_DIR +from core.correctness.validation import check_type, valid_list, valid_dir_path from core.meow import BaseHandler, BaseMonitor, BaseConductor +from functionality.debug import setup_debugging, print_debug +from functionality.file_io import make_dir, read_yaml +from functionality.process_io import wait class MeowRunner: diff --git a/functionality/debug.py b/functionality/debug.py new file mode 100644 index 0000000..0069c4d --- /dev/null +++ b/functionality/debug.py @@ -0,0 +1,42 @@ +""" +This file contains functions for debugging and logging. + +Author(s): David Marchant +""" + +from typing import Any, Tuple + +from core.correctness.validation import check_type +from core.correctness.vars import DEBUG_INFO, DEBUG_WARNING + + +def setup_debugging(print:Any=None, logging:int=0)->Tuple[Any,int]: + """Create a place for debug messages to be sent. Always returns a place, + along with a logging level.""" + check_type(logging, int) + if print is None: + return None, 0 + else: + if not isinstance(print, object): + raise TypeError(f"Invalid print location provided") + writeable = getattr(print, "write", None) + if not writeable or not callable(writeable): + raise TypeError(f"Print object does not implement required " + "'write' function") + + return print, logging + + +def print_debug(print_target, debug_level, msg, level)->None: + """Function to print a message to the debug target, if its level exceeds + the given one.""" + if print_target is None: + return + else: + if level <= debug_level: + status = "ERROR" + if level == DEBUG_INFO: + status = "INFO" + elif level == DEBUG_WARNING: + status = "WARNING" + print(f"{status}: {msg}", file=print_target) diff --git a/functionality/file_io.py b/functionality/file_io.py new file mode 100644 index 0000000..a2836e6 --- /dev/null +++ b/functionality/file_io.py @@ -0,0 +1,118 @@ +""" +This file contains functions for reading and writing different types of files. + +Author(s): David Marchant +""" + +import json +import yaml + +from os import makedirs, remove, rmdir, walk +from os.path import exists, isfile, join +from typing import Any, Dict, List + +from core.correctness.validation import valid_path + + +def make_dir(path:str, can_exist:bool=True, ensure_clean:bool=False): + """ + Creates a new directory at the given path. + + :param path: (str) The directory path. + + :param can_exist: (boolean) [optional] A toggle for if a previously + existing directory at the path will throw an error or not. Default is + true (e.g. no error is thrown if the path already exists) + + :param ensure_clean: (boolean) [optional] A toggle for if a previously + existing directory at the path will be replaced with a new emtpy directory. + Default is False. + + :return: No return + """ + if exists(path): + if isfile(path): + raise ValueError( + f"Cannot make directory in {path} as it already exists and is " + "a file") + if ensure_clean: + rmtree(path) + + makedirs(path, exist_ok=can_exist) + +def rmtree(directory:str): + """ + Remove a directory and all its contents. + Should be faster than shutil.rmtree + + :param: (str) The firectory to empty and remove + + :return: No return + """ + if not exists(directory): + return + for root, dirs, files in walk(directory, topdown=False): + for file in files: + remove(join(root, file)) + for dir in dirs: + rmtree(join(root, dir)) + rmdir(directory) + +def read_file(filepath:str): + with open(filepath, 'r') as file: + return file.read() + +def read_file_lines(filepath:str): + with open(filepath, 'r') as file: + return file.readlines() + +def write_file(source:str, filename:str): + with open(filename, 'w') as file: + file.write(source) + +def read_yaml(filepath:str): + """ + Reads a file path as a yaml object. + + :param filepath: (str) The file to read. + + :return: (object) An object read from the file. + """ + with open(filepath, 'r') as yaml_file: + return yaml.load(yaml_file, Loader=yaml.Loader) + +def write_yaml(source:Any, filename:str): + """ + Writes a given objcet to a yaml file. + + :param source: (any) A python object to be written. + + :param filename: (str) The filename to be written to. + + :return: No return + """ + with open(filename, 'w') as param_file: + yaml.dump(source, param_file, default_flow_style=False) + +def read_notebook(filepath:str): + valid_path(filepath, extension="ipynb") + with open(filepath, 'r') as read_file: + return json.load(read_file) + +def write_notebook(source:Dict[str,Any], filename:str): + """ + Writes the given notebook source code to a given filename. + + :param source: (dict) The notebook source dictionary. + + :param filename: (str) The filename to write to. + + :return: No return + """ + with open(filename, 'w') as job_file: + json.dump(source, job_file) + +def lines_to_string(lines:List[str])->str: + """Function to convert a list of str lines, into one continuous string + separated by newline characters""" + return "\n".join(lines) diff --git a/functionality/hashing.py b/functionality/hashing.py new file mode 100644 index 0000000..7bcfe1a --- /dev/null +++ b/functionality/hashing.py @@ -0,0 +1,36 @@ +""" +This file contains functions for taking hashes of data and files. + +Author(s): David Marchant +""" + +from hashlib import sha256 + +from core.correctness.vars import HASH_BUFFER_SIZE, SHA256 +from core.correctness.validation import check_type, valid_existing_file_path + +def _get_file_sha256(file_path): + sha256_hash = sha256() + + with open(file_path, 'rb') as file_to_hash: + while True: + buffer = file_to_hash.read(HASH_BUFFER_SIZE) + if not buffer: + break + sha256_hash.update(buffer) + + return sha256_hash.hexdigest() + +def get_file_hash(file_path:str, hash:str, hint:str=""): + check_type(hash, str, hint=hint) + + valid_existing_file_path(file_path) + + valid_hashes = { + SHA256: _get_file_sha256 + } + if hash not in valid_hashes: + raise KeyError(f"Cannot use hash '{hash}'. Valid are " + f"'{list(valid_hashes.keys())}") + + return valid_hashes[hash](file_path) diff --git a/functionality/meow.py b/functionality/meow.py new file mode 100644 index 0000000..a4c5386 --- /dev/null +++ b/functionality/meow.py @@ -0,0 +1,103 @@ +""" +This file contains functions for meow specific functionality. + +Author(s): David Marchant +""" + +from datetime import datetime +from os.path import basename, dirname, relpath, splitext +from typing import Any, Dict + +from core.correctness.vars import EVENT_PATH, EVENT_RULE, EVENT_TYPE, \ + EVENT_TYPE_WATCHDOG, JOB_CREATE_TIME, JOB_EVENT, JOB_ID, JOB_PATTERN, \ + JOB_RECIPE, JOB_REQUIREMENTS, JOB_RULE, JOB_STATUS, JOB_TYPE, \ + STATUS_QUEUED, WATCHDOG_BASE, WATCHDOG_HASH +from functionality.naming import generate_id + + +# mig trigger keyword replacements +KEYWORD_PATH = "{PATH}" +KEYWORD_REL_PATH = "{REL_PATH}" +KEYWORD_DIR = "{DIR}" +KEYWORD_REL_DIR = "{REL_DIR}" +KEYWORD_FILENAME = "{FILENAME}" +KEYWORD_PREFIX = "{PREFIX}" +KEYWORD_BASE = "{VGRID}" +KEYWORD_EXTENSION = "{EXTENSION}" +KEYWORD_JOB = "{JOB}" + + +def replace_keywords(old_dict:Dict[str,str], job_id:str, src_path:str, + monitor_base:str)->Dict[str,str]: + """Function to replace all MEOW magic words in a dictionary with dynamic + values.""" + new_dict = {} + + filename = basename(src_path) + dir = dirname(src_path) + relativepath = relpath(src_path, monitor_base) + reldirname = dirname(relativepath) + (prefix, extension) = splitext(filename) + + for var, val in old_dict.items(): + if isinstance(val, str): + val = val.replace(KEYWORD_PATH, src_path) + val = val.replace(KEYWORD_REL_PATH, relativepath) + val = val.replace(KEYWORD_DIR, dir) + val = val.replace(KEYWORD_REL_DIR, reldirname) + val = val.replace(KEYWORD_FILENAME, filename) + val = val.replace(KEYWORD_PREFIX, prefix) + val = val.replace(KEYWORD_BASE, monitor_base) + val = val.replace(KEYWORD_EXTENSION, extension) + val = val.replace(KEYWORD_JOB, job_id) + + new_dict[var] = val + else: + new_dict[var] = val + + return new_dict + +def create_event(event_type:str, path:str, rule:Any, extras:Dict[Any,Any]={} + )->Dict[Any,Any]: + """Function to create a MEOW dictionary.""" + return { + **extras, + EVENT_PATH: path, + EVENT_TYPE: event_type, + EVENT_RULE: rule + } + +def create_watchdog_event(path:str, rule:Any, base:str, hash:str, + extras:Dict[Any,Any]={})->Dict[Any,Any]: + """Function to create a MEOW event dictionary.""" + return create_event( + EVENT_TYPE_WATCHDOG, + path, + rule, + extras={ + **extras, + **{ + WATCHDOG_HASH: hash, + WATCHDOG_BASE: base + } + } + ) + +def create_job(job_type:str, event:Dict[str,Any], extras:Dict[Any,Any]={} + )->Dict[Any,Any]: + """Function to create a MEOW job dictionary.""" + job_dict = { + #TODO compress event? + JOB_ID: generate_id(prefix="job_"), + JOB_EVENT: event, + JOB_TYPE: job_type, + JOB_PATTERN: event[EVENT_RULE].pattern.name, + JOB_RECIPE: event[EVENT_RULE].recipe.name, + JOB_RULE: event[EVENT_RULE].name, + JOB_STATUS: STATUS_QUEUED, + JOB_CREATE_TIME: datetime.now(), + JOB_REQUIREMENTS: event[EVENT_RULE].recipe.requirements + } + + return {**extras, **job_dict} + diff --git a/functionality/naming.py b/functionality/naming.py new file mode 100644 index 0000000..95c6073 --- /dev/null +++ b/functionality/naming.py @@ -0,0 +1,23 @@ +""" +This file contains functions for dynamic naming of objects. + +Author(s): David Marchant +""" + +from typing import List +from random import SystemRandom + +from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE + + +#TODO Make this guaranteed unique +def generate_id(prefix:str="", length:int=16, existing_ids:List[str]=[], + charset:str=CHAR_UPPERCASE+CHAR_LOWERCASE, attempts:int=24): + random_length = max(length - len(prefix), 0) + for _ in range(attempts): + id = prefix + ''.join(SystemRandom().choice(charset) + for _ in range(random_length)) + if id not in existing_ids: + return id + raise ValueError(f"Could not generate ID unique from '{existing_ids}' " + f"using values '{charset}' and length of '{length}'.") diff --git a/functionality/parameterisation.py b/functionality/parameterisation.py new file mode 100644 index 0000000..a23fefe --- /dev/null +++ b/functionality/parameterisation.py @@ -0,0 +1,121 @@ +""" +This file contains functions for parameterising code in various formats. + +Author(s): David Marchant +""" + +from copy import deepcopy +from nbformat import validate +from os import getenv +from papermill.translators import papermill_translators +from typing import Any, Dict, List + +from core.correctness.validation import check_script, check_type + +# Adapted from: https://github.com/rasmunk/notebook_parameterizer +def parameterize_jupyter_notebook(jupyter_notebook:Dict[str,Any], + parameters:Dict[str,Any], expand_env_values:bool=False)->Dict[str,Any]: + validate(jupyter_notebook) + check_type(parameters, Dict, + hint="parameterize_jupyter_notebook.parameters") + + if jupyter_notebook["nbformat"] != 4: + raise Warning( + "Parameterization designed to work with nbformat version 4. " + f"Differing version of '{jupyter_notebook['nbformat']}' may " + "produce unexpeted results.") + + # Load input notebook + if "kernelspec" in jupyter_notebook["metadata"]: + kernel_name = jupyter_notebook["metadata"]["kernelspec"]["name"] + language = jupyter_notebook["metadata"]["kernelspec"]["language"] + if "language_info" in jupyter_notebook["metadata"]: + kernel_name = jupyter_notebook["metadata"]["language_info"]["name"] + language = jupyter_notebook["metadata"]["language_info"]["name"] + else: + raise AttributeError( + f"Notebook lacks key language and/or kernel_name attributes " + "within metadata") + + translator = papermill_translators.find_translator(kernel_name, language) + + output_notebook = deepcopy(jupyter_notebook) + + # Find each + cells = output_notebook["cells"] + code_cells = [ + (idx, cell) for idx, cell in enumerate(cells) \ + if cell["cell_type"] == "code" + ] + for idx, cell in code_cells: + cell_updated = False + source = cell["source"] + # Either single string or a list of strings + if isinstance(source, str): + lines = source.split("\n") + else: + lines = source + + for idy, line in enumerate(lines): + if "=" in line: + d_line = list(map(lambda x: x.replace(" ", ""), + line.split("="))) + # Matching parameter name + if len(d_line) == 2 and d_line[0] in parameters: + value = parameters[d_line[0]] + # Whether to expand value from os env + if ( + expand_env_values + and isinstance(value, str) + and value.startswith("ENV_") + ): + env_var = value.replace("ENV_", "") + value = getenv( + env_var, + "MISSING ENVIRONMENT VARIABLE: {}".format(env_var) + ) + lines[idy] = translator.assign( + d_line[0], translator.translate(value) + ) + + cell_updated = True + if cell_updated: + cells[idx]["source"] = "\n".join(lines) + + # Validate that the parameterized notebook is still valid + validate(output_notebook, version=4) + + return output_notebook + +def parameterize_python_script(script:List[str], parameters:Dict[str,Any], + expand_env_values:bool=False)->Dict[str,Any]: + check_script(script) + check_type(parameters, Dict + ,hint="parameterize_python_script.parameters") + + output_script = deepcopy(script) + + for i, line in enumerate(output_script): + if "=" in line: + d_line = list(map(lambda x: x.replace(" ", ""), + line.split("="))) + # Matching parameter name + if len(d_line) == 2 and d_line[0] in parameters: + value = parameters[d_line[0]] + # Whether to expand value from os env + if ( + expand_env_values + and isinstance(value, str) + and value.startswith("ENV_") + ): + env_var = value.replace("ENV_", "") + value = getenv( + env_var, + "MISSING ENVIRONMENT VARIABLE: {}".format(env_var) + ) + output_script[i] = f"{d_line[0]} = {repr(value)}" + + # Validate that the parameterized notebook is still valid + check_script(output_script) + + return output_script diff --git a/functionality/process_io.py b/functionality/process_io.py new file mode 100644 index 0000000..6cd4946 --- /dev/null +++ b/functionality/process_io.py @@ -0,0 +1,41 @@ +""" +This file contains functions for reading and writing between processes. + +Author(s): David Marchant +""" + +from os import name as osName +from typing import List + +from multiprocessing.connection import Connection, wait as multi_wait +# Need to import additional Connection type for Windows machines +if osName == 'nt': + from multiprocessing.connection import PipeConnection +from multiprocessing.queues import Queue +from core.correctness.vars import VALID_CHANNELS + + +def wait(inputs:List[VALID_CHANNELS])->List[VALID_CHANNELS]: + if osName == 'nt': + return wait_windows(inputs) + return wait_linux(inputs) + +def wait_windows(inputs:List[VALID_CHANNELS])->List[VALID_CHANNELS]: + all_connections = [i for i in inputs if type(i) is Connection] \ + + [i for i in inputs if type(i) is PipeConnection] \ + + [i._reader for i in inputs if type(i) is Queue] + ready = multi_wait(all_connections) + ready_inputs = [i for i in inputs if \ + (type(i) is Connection and i in ready) \ + or (type(i) is PipeConnection and i in ready) \ + or (type(i) is Queue and i._reader in ready)] + return ready_inputs + +def wait_linux(inputs:List[VALID_CHANNELS])->List[VALID_CHANNELS]: + all_connections = [i for i in inputs if type(i) is Connection] \ + + [i._reader for i in inputs if type(i) is Queue] + ready = multi_wait(all_connections) + ready_inputs = [i for i in inputs if \ + (type(i) is Connection and i in ready) \ + or (type(i) is Queue and i._reader in ready)] + return ready_inputs diff --git a/patterns/file_event_pattern.py b/patterns/file_event_pattern.py index fc93c6c..381997e 100644 --- a/patterns/file_event_pattern.py +++ b/patterns/file_event_pattern.py @@ -19,15 +19,16 @@ from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler from core.correctness.validation import check_type, valid_string, \ - valid_dict, valid_list, valid_path, valid_dir_path, setup_debugging + valid_dict, valid_list, valid_path, valid_dir_path from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ VALID_VARIABLE_NAME_CHARS, FILE_EVENTS, FILE_CREATE_EVENT, \ FILE_MODIFY_EVENT, FILE_MOVED_EVENT, DEBUG_INFO, \ FILE_RETROACTIVE_EVENT, SHA256 -from core.functionality import print_debug, create_watchdog_event, \ - get_file_hash from core.meow import BasePattern, BaseMonitor, BaseRule, BaseRecipe, \ create_rule +from functionality.debug import setup_debugging, print_debug +from functionality.hashing import get_file_hash +from functionality.meow import create_watchdog_event # Events that are monitored by default _DEFAULT_MASK = [ diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index f5c6465..d9fb877 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -12,17 +12,18 @@ import sys from typing import Any, Tuple, Dict from core.correctness.validation import check_type, valid_string, \ - valid_dict, valid_path, valid_dir_path, setup_debugging, \ - valid_event + valid_dict, valid_path, valid_dir_path, valid_event from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, PYTHON_FUNC, \ DEBUG_INFO, EVENT_TYPE_WATCHDOG, JOB_HASH, DEFAULT_JOB_QUEUE_DIR, \ EVENT_PATH, JOB_TYPE_PAPERMILL, WATCHDOG_HASH, JOB_PARAMETERS, \ JOB_ID, WATCHDOG_BASE, META_FILE, \ PARAMS_FILE, JOB_STATUS, STATUS_QUEUED, EVENT_RULE, EVENT_TYPE, \ EVENT_RULE, get_base_file -from core.functionality import print_debug, create_job, replace_keywords, \ - make_dir, write_yaml, write_notebook, read_notebook from core.meow import BaseRecipe, BaseHandler +from functionality.debug import setup_debugging, print_debug +from functionality.file_io import make_dir, read_notebook, write_notebook, \ + write_yaml +from functionality.meow import create_job, replace_keywords class JupyterNotebookRecipe(BaseRecipe): @@ -185,13 +186,14 @@ def papermill_job_func(job_dir): import os import papermill from datetime import datetime - from core.functionality import write_yaml, read_yaml, write_notebook, \ - get_file_hash, parameterize_jupyter_notebook from core.correctness.vars import JOB_EVENT, JOB_ID, \ EVENT_PATH, META_FILE, PARAMS_FILE, \ JOB_STATUS, JOB_HASH, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ JOB_ERROR, STATUS_FAILED, get_job_file, \ get_result_file + from functionality.file_io import read_yaml, write_notebook, write_yaml + from functionality.hashing import get_file_hash + from functionality.parameterisation import parameterize_jupyter_notebook # Identify job files diff --git a/recipes/python_recipe.py b/recipes/python_recipe.py index 35f60f5..639548e 100644 --- a/recipes/python_recipe.py +++ b/recipes/python_recipe.py @@ -11,16 +11,18 @@ import sys from typing import Any, Tuple, Dict, List from core.correctness.validation import check_script, valid_string, \ - valid_dict, valid_event, valid_dir_path, setup_debugging + valid_dict, valid_event, valid_dir_path from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, PYTHON_FUNC, \ DEBUG_INFO, EVENT_TYPE_WATCHDOG, JOB_HASH, DEFAULT_JOB_QUEUE_DIR, \ EVENT_RULE, EVENT_PATH, JOB_TYPE_PYTHON, WATCHDOG_HASH, JOB_PARAMETERS, \ JOB_ID, WATCHDOG_BASE, META_FILE, \ PARAMS_FILE, JOB_STATUS, STATUS_QUEUED, EVENT_TYPE, EVENT_RULE, \ get_base_file -from core.functionality import print_debug, create_job, replace_keywords, \ - make_dir, write_yaml, write_file, lines_to_string, read_file_lines from core.meow import BaseRecipe, BaseHandler +from functionality.debug import setup_debugging, print_debug +from functionality.file_io import make_dir, read_file_lines, write_file, \ + write_yaml, lines_to_string +from functionality.meow import create_job, replace_keywords class PythonRecipe(BaseRecipe): @@ -176,13 +178,14 @@ def python_job_func(job_dir): import os from datetime import datetime from io import StringIO - from core.functionality import write_yaml, read_yaml, \ - get_file_hash, parameterize_python_script from core.correctness.vars import JOB_EVENT, JOB_ID, \ EVENT_PATH, META_FILE, PARAMS_FILE, \ JOB_STATUS, JOB_HASH, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ JOB_ERROR, STATUS_FAILED, get_base_file, \ get_job_file, get_result_file + from functionality.file_io import read_yaml, write_yaml + from functionality.hashing import get_file_hash + from functionality.parameterisation import parameterize_python_script # Identify job files meta_file = os.path.join(job_dir, META_FILE) diff --git a/tests/shared.py b/tests/shared.py index ef16f82..10793a2 100644 --- a/tests/shared.py +++ b/tests/shared.py @@ -6,7 +6,7 @@ Author(s): David Marchant import os from core.correctness.vars import DEFAULT_JOB_OUTPUT_DIR, DEFAULT_JOB_QUEUE_DIR -from core.functionality import make_dir, rmtree +from functionality.file_io import make_dir, rmtree # testing diff --git a/tests/test_conductors.py b/tests/test_conductors.py index f29fac7..7a90b2b 100644 --- a/tests/test_conductors.py +++ b/tests/test_conductors.py @@ -11,11 +11,12 @@ from core.correctness.vars import JOB_TYPE_PYTHON, SHA256, JOB_PARAMETERS, \ STATUS_DONE, JOB_TYPE_PAPERMILL, JOB_RECIPE, JOB_RULE, JOB_CREATE_TIME, \ JOB_REQUIREMENTS, EVENT_PATH, EVENT_RULE, EVENT_TYPE, \ EVENT_TYPE_WATCHDOG, get_base_file, get_result_file, get_job_file -from core.functionality import get_file_hash, create_watchdog_event, \ - create_job, make_dir, write_yaml, write_notebook, read_yaml, write_file, \ - lines_to_string, read_file from core.meow import create_rule from conductors import LocalPythonConductor +from functionality.file_io import read_file, read_yaml, write_file, \ + write_notebook, write_yaml, lines_to_string, make_dir +from functionality.hashing import get_file_hash +from functionality.meow import create_watchdog_event, create_job from patterns import FileEventPattern from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ papermill_job_func @@ -24,7 +25,6 @@ from shared import setup, teardown, TEST_MONITOR_BASE, APPENDING_NOTEBOOK, \ TEST_JOB_OUTPUT, TEST_JOB_QUEUE, COMPLETE_PYTHON_SCRIPT, \ BAREBONES_PYTHON_SCRIPT, BAREBONES_NOTEBOOK - def failing_func(): raise Exception("bad function") diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 3700f9b..d61b64b 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -13,15 +13,20 @@ from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ PYTHON_FUNC, JOB_ID, JOB_EVENT, \ JOB_TYPE, JOB_PATTERN, JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, \ JOB_REQUIREMENTS, STATUS_QUEUED, JOB_TYPE_PAPERMILL -from core.functionality import generate_id, wait, get_file_hash, rmtree, \ - make_dir, parameterize_jupyter_notebook, create_event, create_job, \ - replace_keywords, write_yaml, write_notebook, read_yaml, read_notebook, \ - create_watchdog_event, lines_to_string, \ - parameterize_python_script, write_file, read_file, read_file_lines, \ - KEYWORD_PATH, KEYWORD_REL_PATH, KEYWORD_DIR, KEYWORD_REL_DIR, \ - KEYWORD_FILENAME, KEYWORD_PREFIX, KEYWORD_BASE, KEYWORD_EXTENSION, \ - KEYWORD_JOB from core.meow import create_rule +from functionality.file_io import lines_to_string, make_dir, read_file, \ + read_file_lines, read_notebook, read_yaml, rmtree, write_file, \ + write_notebook, write_yaml +from functionality.hashing import get_file_hash +from functionality.meow import create_event, create_job, \ + create_watchdog_event, replace_keywords, \ + KEYWORD_BASE, KEYWORD_DIR, KEYWORD_EXTENSION, KEYWORD_FILENAME, \ + KEYWORD_JOB, KEYWORD_PATH, KEYWORD_PREFIX, KEYWORD_REL_DIR, \ + KEYWORD_REL_PATH +from functionality.naming import generate_id +from functionality.parameterisation import parameterize_jupyter_notebook, \ + parameterize_python_script +from functionality.process_io import wait from patterns import FileEventPattern from recipes import JupyterNotebookRecipe from shared import setup, teardown, TEST_MONITOR_BASE, COMPLETE_NOTEBOOK, \ diff --git a/tests/test_patterns.py b/tests/test_patterns.py index ed59ccb..b6f9473 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -8,13 +8,14 @@ from multiprocessing import Pipe from core.correctness.vars import FILE_CREATE_EVENT, EVENT_TYPE, \ EVENT_RULE, WATCHDOG_BASE, EVENT_TYPE_WATCHDOG, EVENT_PATH, SWEEP_START, \ SWEEP_JUMP, SWEEP_STOP -from core.functionality import make_dir +from functionality.file_io import make_dir from patterns.file_event_pattern import FileEventPattern, WatchdogMonitor, \ _DEFAULT_MASK from recipes import JupyterNotebookRecipe from shared import setup, teardown, BAREBONES_NOTEBOOK, TEST_MONITOR_BASE + def patterns_equal(tester, pattern_one, pattern_two): tester.assertEqual(pattern_one.name, pattern_two.name) tester.assertEqual(pattern_one.recipe, pattern_two.recipe) diff --git a/tests/test_recipes.py b/tests/test_recipes.py index 31407d3..5276543 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -13,10 +13,11 @@ from core.correctness.vars import EVENT_TYPE, WATCHDOG_BASE, EVENT_RULE, \ PARAMS_FILE, SWEEP_STOP, SWEEP_JUMP, SWEEP_START, JOB_TYPE_PAPERMILL, \ get_base_file, get_job_file, get_result_file from core.correctness.validation import valid_job -from core.functionality import get_file_hash, create_job, \ - create_watchdog_event, make_dir, write_yaml, write_notebook, read_yaml, \ - write_file, lines_to_string from core.meow import create_rules, create_rule +from functionality.file_io import lines_to_string, make_dir, read_yaml, \ + write_file, write_notebook, write_yaml +from functionality.hashing import get_file_hash +from functionality.meow import create_job, create_watchdog_event from patterns.file_event_pattern import FileEventPattern from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ PapermillHandler, papermill_job_func diff --git a/tests/test_runner.py b/tests/test_runner.py index b21644c..a65c219 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -8,9 +8,9 @@ from time import sleep from conductors import LocalPythonConductor from core.correctness.vars import get_result_file, \ JOB_TYPE_PAPERMILL, JOB_ERROR, META_FILE, JOB_TYPE_PYTHON, JOB_CREATE_TIME -from core.functionality import make_dir, read_notebook, read_yaml, read_file from core.meow import BaseMonitor, BaseHandler, BaseConductor from core.runner import MeowRunner +from functionality.file_io import make_dir, read_file, read_notebook, read_yaml from patterns.file_event_pattern import WatchdogMonitor, FileEventPattern from recipes.jupyter_notebook_recipe import PapermillHandler, \ JupyterNotebookRecipe diff --git a/tests/test_validation.py b/tests/test_validation.py index 602cf83..b2cd0d7 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -9,12 +9,13 @@ from typing import Any, Union from core.correctness.validation import check_type, check_implementation, \ valid_string, valid_dict, valid_list, valid_existing_file_path, \ valid_dir_path, valid_non_existing_path, valid_event, valid_job, \ - setup_debugging, valid_watchdog_event, check_callable + valid_watchdog_event, check_callable from core.correctness.vars import VALID_NAME_CHARS, SHA256, EVENT_TYPE, \ EVENT_PATH, JOB_TYPE, JOB_EVENT, JOB_ID, JOB_PATTERN, JOB_RECIPE, \ JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, EVENT_RULE, WATCHDOG_BASE, \ WATCHDOG_HASH -from core.functionality import make_dir +from functionality.debug import setup_debugging +from functionality.file_io import make_dir from shared import setup, teardown, TEST_MONITOR_BASE class CorrectnessTests(unittest.TestCase):