From 636d70f4e885a11c995170da7f59175a369f3b54 Mon Sep 17 00:00:00 2001 From: PatchOfScotland Date: Wed, 1 Feb 2023 17:43:16 +0100 Subject: [PATCH] added python handler, and reworked handler and conductor event/job discovery to be more modular --- conductors/local_python_conductor.py | 18 +- core/correctness/validation.py | 7 +- core/correctness/vars.py | 8 +- core/functionality.py | 19 +- core/meow.py | 17 +- core/runner.py | 129 ++++++-------- patterns/file_event_pattern.py | 17 +- recipes/__init__.py | 1 + recipes/jupyter_notebook_recipe.py | 42 +++-- recipes/python_recipe.py | 258 +++++++++++++++++++++++++++ tests/test_conductors.py | 38 ++-- tests/test_functionality.py | 41 +++-- tests/test_meow.py | 4 +- tests/test_patterns.py | 50 +++--- tests/test_recipes.py | 43 +++-- tests/test_runner.py | 86 +++------ tests/test_validation.py | 18 +- 17 files changed, 537 insertions(+), 259 deletions(-) create mode 100644 recipes/python_recipe.py diff --git a/conductors/local_python_conductor.py b/conductors/local_python_conductor.py index 6b2ff9d..81312c9 100644 --- a/conductors/local_python_conductor.py +++ b/conductors/local_python_conductor.py @@ -11,20 +11,30 @@ import shutil from datetime import datetime from typing import Any -from core.correctness.vars import PYTHON_TYPE, PYTHON_FUNC, JOB_STATUS, \ +from core.correctness.vars import JOB_TYPE_PYTHON, PYTHON_FUNC, JOB_STATUS, \ STATUS_RUNNING, JOB_START_TIME, PYTHON_EXECUTION_BASE, JOB_ID, META_FILE, \ - STATUS_DONE, JOB_END_TIME, STATUS_FAILED, JOB_ERROR, PYTHON_OUTPUT_DIR + STATUS_DONE, JOB_END_TIME, STATUS_FAILED, JOB_ERROR, PYTHON_OUTPUT_DIR, \ + JOB_TYPE from core.correctness.validation import valid_job from core.functionality import read_yaml, write_yaml from core.meow import BaseConductor +# TODO add comments to me class LocalPythonConductor(BaseConductor): def __init__(self)->None: super().__init__() - def valid_job_types(self)->list[str]: - return [PYTHON_TYPE] + def valid_execute_criteria(self, job:dict[str,Any])->bool: + """Function to determine given an job defintion, if this conductor can + process it or not. This conductor will accept any Python job type""" + try: + valid_job(job) + if job[JOB_TYPE] == JOB_TYPE_PYTHON: + return True + except: + pass + return False def execute(self, job:dict[str,Any])->None: valid_job(job) diff --git a/core/correctness/validation.py b/core/correctness/validation.py index 7eefb2e..50b83c3 100644 --- a/core/correctness/validation.py +++ b/core/correctness/validation.py @@ -12,12 +12,15 @@ from typing import Any, _SpecialForm, Union, Tuple, get_origin, get_args from core.correctness.vars import VALID_PATH_CHARS, get_not_imp_msg, \ EVENT_TYPE, EVENT_PATH, JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, \ - JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME + JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, EVENT_RULE # Required keys in event dict EVENT_KEYS = { EVENT_TYPE: str, - EVENT_PATH: str + EVENT_PATH: str, + # TODO sort this + # Should be a Rule but can't import here due to circular dependencies + EVENT_RULE: Any } # Required keys in job dict diff --git a/core/correctness/vars.py b/core/correctness/vars.py index 7956eff..61a0fd2 100644 --- a/core/correctness/vars.py +++ b/core/correctness/vars.py @@ -41,9 +41,11 @@ SHA256 = "sha256" # meow events EVENT_TYPE = "event_type" EVENT_PATH = "event_path" -WATCHDOG_TYPE = "watchdog" +EVENT_RULE = "rule" + +# watchdog events +EVENT_TYPE_WATCHDOG = "watchdog" WATCHDOG_BASE = "monitor_base" -WATCHDOG_RULE = "rule_name" WATCHDOG_HASH = "file_hash" # inotify events @@ -77,7 +79,7 @@ DIR_EVENTS = [ # meow jobs JOB_TYPE = "job_type" -PYTHON_TYPE = "python" +JOB_TYPE_PYTHON = "python" PYTHON_FUNC = "func" PYTHON_EXECUTION_BASE = "exection_base" PYTHON_OUTPUT_DIR = "output_dir" diff --git a/core/functionality.py b/core/functionality.py index 19963d4..33b3e86 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -19,7 +19,7 @@ from core.correctness.validation import check_type, valid_existing_file_path, \ from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ VALID_CHANNELS, HASH_BUFFER_SIZE, SHA256, DEBUG_WARNING, DEBUG_INFO, \ EVENT_TYPE, EVENT_PATH, JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, \ - JOB_RECIPE, JOB_RULE, WATCHDOG_RULE, JOB_STATUS, STATUS_QUEUED, \ + JOB_RECIPE, JOB_RULE, EVENT_RULE, JOB_STATUS, STATUS_QUEUED, \ JOB_CREATE_TIME, JOB_REQUIREMENTS # mig trigger keyword replacements @@ -283,9 +283,14 @@ def replace_keywords(old_dict:dict[str,str], job_id:str, src_path:str, return new_dict -def create_event(event_type:str, path:str, source:dict[Any,Any]={} +def create_event(event_type:str, path:str, rule:Any, source:dict[Any,Any]={} )->dict[Any,Any]: - return {**source, EVENT_PATH: path, EVENT_TYPE: event_type} + return { + **source, + EVENT_PATH: path, + EVENT_TYPE: event_type, + EVENT_RULE: rule + } def create_job(job_type:str, event:dict[str,Any], source:dict[Any,Any]={} )->dict[Any,Any]: @@ -294,12 +299,12 @@ def create_job(job_type:str, event:dict[str,Any], source:dict[Any,Any]={} JOB_ID: generate_id(prefix="job_"), JOB_EVENT: event, JOB_TYPE: job_type, - JOB_PATTERN: event[WATCHDOG_RULE].pattern.name, - JOB_RECIPE: event[WATCHDOG_RULE].recipe.name, - JOB_RULE: event[WATCHDOG_RULE].name, + JOB_PATTERN: event[EVENT_RULE].pattern.name, + JOB_RECIPE: event[EVENT_RULE].recipe.name, + JOB_RULE: event[EVENT_RULE].name, JOB_STATUS: STATUS_QUEUED, JOB_CREATE_TIME: datetime.now(), - JOB_REQUIREMENTS: event[WATCHDOG_RULE].recipe.requirements + JOB_REQUIREMENTS: event[EVENT_RULE].recipe.requirements } return {**source, **job_dict} diff --git a/core/meow.py b/core/meow.py index 3f2edd1..f4a80b4 100644 --- a/core/meow.py +++ b/core/meow.py @@ -313,7 +313,7 @@ class BaseHandler: """BaseHandler Constructor. This will check that any class inheriting from it implements its validation functions.""" check_implementation(type(self).handle, BaseHandler) - check_implementation(type(self).valid_event_types, BaseHandler) + check_implementation(type(self).valid_handle_criteria, BaseHandler) def __new__(cls, *args, **kwargs): """A check that this base class is not instantiated itself, only @@ -323,9 +323,10 @@ class BaseHandler: raise TypeError(msg) return object.__new__(cls) - def valid_event_types(self)->list[str]: - """Function to provide a list of the types of events this handler can - process. Must be implemented by any child process.""" + # TODO also implement something like me from conductor + def valid_handle_criteria(self, event:dict[str,Any])->bool: + """Function to determine given an event defintion, if this handler can + process it or not. Must be implemented by any child process.""" pass def handle(self, event:dict[str,Any])->None: @@ -339,7 +340,7 @@ class BaseConductor: """BaseConductor Constructor. This will check that any class inheriting from it implements its validation functions.""" check_implementation(type(self).execute, BaseConductor) - check_implementation(type(self).valid_job_types, BaseConductor) + check_implementation(type(self).valid_execute_criteria, BaseConductor) def __new__(cls, *args, **kwargs): """A check that this base class is not instantiated itself, only @@ -349,9 +350,9 @@ class BaseConductor: raise TypeError(msg) return object.__new__(cls) - def valid_job_types(self)->list[str]: - """Function to provide a list of the types of jobs this conductor can - process. Must be implemented by any child process.""" + def valid_execute_criteria(self, job:dict[str,Any])->bool: + """Function to determine given an job defintion, if this conductor can + process it or not. Must be implemented by any child process.""" pass def execute(self, job:dict[str,Any])->None: diff --git a/core/runner.py b/core/runner.py index c464843..ed4a2c3 100644 --- a/core/runner.py +++ b/core/runner.py @@ -27,9 +27,9 @@ class MeowRunner: # A collection of all monitors in the runner monitors:list[BaseMonitor] # A collection of all handlers in the runner - handlers:dict[str:BaseHandler] + handlers:list[BaseHandler] # A collection of all conductors in the runner - conductors:dict[str:BaseConductor] + conductors:list[BaseConductor] # A collection of all channels from each monitor from_monitors: list[VALID_CHANNELS] # A collection of all channels from each handler @@ -46,48 +46,19 @@ class MeowRunner: # If conductors isn't a list, make it one if not type(conductors) == list: conductors = [conductors] - self.conductors = {} - # Create a dictionary of conductors, keyed by job type, and valued by a - # list of conductors for that job type - for conductor in conductors: - conductor_jobs = conductor.valid_job_types() - if not conductor_jobs: - raise ValueError( - "Cannot start runner with conductor that does not " - f"implement '{BaseConductor.valid_job_types.__name__}" - f"({signature(BaseConductor.valid_job_types)})' and " - "return a list of at least one conductable job.") - for job in conductor_jobs: - if job in self.conductors.keys(): - self.conductors[job].append(conductor) - else: - self.conductors[job] = [conductor] + self.conductors = conductors self._is_valid_handlers(handlers) # If handlers isn't a list, make it one if not type(handlers) == list: handlers = [handlers] - self.handlers = {} self.from_handlers = [] - # Create a dictionary of handlers, keyed by event type, and valued by a - # list of handlers for that event type - for handler in handlers: - handler_events = handler.valid_event_types() - if not handler_events: - raise ValueError( - "Cannot start runner with handler that does not " - f"implement '{BaseHandler.valid_event_types.__name__}" - f"({signature(BaseHandler.valid_event_types)})' and " - "return a list of at least one handlable event.") - for event in handler_events: - if event in self.handlers.keys(): - self.handlers[event].append(handler) - else: - self.handlers[event] = [handler] + for handler in handlers: # Create a channel from the handler back to this runner handler_to_runner_reader, handler_to_runner_writer = Pipe() handler.to_runner = handler_to_runner_writer self.from_handlers.append(handler_to_runner_reader) + self.handlers = handlers self._is_valid_monitors(monitors) # If monitors isn't a list, make it one @@ -129,21 +100,30 @@ class MeowRunner: # Read event from the monitor channel message = from_monitor.recv() event = message - # Abort if we don't have a relevent handler. - if not self.handlers[event[EVENT_TYPE]]: - print_debug(self._print_target, self.debug_level, - "Could not process event as no relevent " - f"handler for '{event[EVENT_TYPE]}'", - DEBUG_INFO) - continue + + valid_handlers = [] + for handler in self.handlers: + try: + valid = handler.valid_handle_criteria(event) + if valid: + valid_handlers.append(handler) + except Exception as e: + print_debug( + self._print_target, + self.debug_level, + "Could not determine validity of event " + f"for handler. {e}", + DEBUG_INFO + ) + # If we've only one handler, use that - if len(self.handlers[event[EVENT_TYPE]]) == 1: - handler = self.handlers[event[EVENT_TYPE]][0] + if len(valid_handlers) == 1: + handler = valid_handlers[0] self.handle_event(handler, event) # If multiple handlers then randomly pick one else: - handler = self.handlers[event[EVENT_TYPE]][ - randrange(len(self.handlers[event[EVENT_TYPE]])) + handler = valid_handlers[ + randrange(len(valid_handlers)) ] self.handle_event(handler, event) @@ -172,20 +152,29 @@ class MeowRunner: f"job at '{job_dir}'. {e}", DEBUG_INFO) continue - # Abort if we don't have a relevent conductor. - if not self.conductors[job[JOB_TYPE]]: - print_debug(self._print_target, self.debug_level, - "Could not process job as no relevent " - f"conductor for '{job[JOB_TYPE]}'", DEBUG_INFO) - continue + valid_conductors = [] + for conductor in self.conductors: + try: + valid = conductor.valid_execute_criteria(job) + if valid: + valid_conductors.append(conductor) + except Exception as e: + print_debug( + self._print_target, + self.debug_level, + "Could not determine validity of job " + f"for conductor. {e}", + DEBUG_INFO + ) + # If we've only one conductor, use that - if len(self.conductors[job[JOB_TYPE]]) == 1: - conductor = self.conductors[job[JOB_TYPE]][0] + if len(valid_conductors) == 1: + conductor = valid_conductors[0] self.execute_job(conductor, job) - # If multiple conductors then randomly pick one + # If multiple handlers then randomly pick one else: - conductor = self.conductors[job[JOB_TYPE]][ - randrange(len(self.conductors[job[JOB_TYPE]])) + conductor = valid_conductors[ + randrange(len(valid_conductors)) ] self.execute_job(conductor, job) @@ -227,15 +216,13 @@ class MeowRunner: monitor.start() startable = [] # Start all handlers, if they need it - for handler_list in self.handlers.values(): - for handler in handler_list: - if hasattr(handler, "start") and handler not in startable: - startable.append() + for handler in self.handlers: + if hasattr(handler, "start") and handler not in startable: + startable.append() # Start all conductors, if they need it - for conductor_list in self.conductors.values(): - for conductor in conductor_list: - if hasattr(conductor, "start") and conductor not in startable: - startable.append() + for conductor in self.conductors: + if hasattr(conductor, "start") and conductor not in startable: + startable.append() for starting in startable: starting.start() @@ -283,15 +270,13 @@ class MeowRunner: stopable = [] # Stop all handlers, if they need it - for handler_list in self.handlers.values(): - for handler in handler_list: - if hasattr(handler, "stop") and handler not in stopable: - stopable.append() + for handler in self.handlers: + if hasattr(handler, "stop") and handler not in stopable: + stopable.append() # Stop all conductors, if they need it - for conductor_list in self.conductors.values(): - for conductor in conductor_list: - if hasattr(conductor, "stop") and conductor not in stopable: - stopable.append() + for conductor in self.conductors: + if hasattr(conductor, "stop") and conductor not in stopable: + stopable.append() for stopping in stopable: stopping.stop() diff --git a/patterns/file_event_pattern.py b/patterns/file_event_pattern.py index f338768..2136458 100644 --- a/patterns/file_event_pattern.py +++ b/patterns/file_event_pattern.py @@ -23,8 +23,8 @@ from core.correctness.validation import check_type, valid_string, \ setup_debugging from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ VALID_VARIABLE_NAME_CHARS, FILE_EVENTS, FILE_CREATE_EVENT, \ - FILE_MODIFY_EVENT, FILE_MOVED_EVENT, DEBUG_INFO, WATCHDOG_TYPE, \ - WATCHDOG_RULE, WATCHDOG_BASE, FILE_RETROACTIVE_EVENT, WATCHDOG_HASH, SHA256 + FILE_MODIFY_EVENT, FILE_MOVED_EVENT, DEBUG_INFO, EVENT_TYPE_WATCHDOG, \ + WATCHDOG_BASE, FILE_RETROACTIVE_EVENT, WATCHDOG_HASH, SHA256 from core.functionality import print_debug, create_event, get_file_hash from core.meow import BasePattern, BaseMonitor, BaseRule, BaseRecipe, \ create_rule @@ -234,14 +234,14 @@ class WatchdogMonitor(BaseMonitor): recursive_hit = match(recursive_regexp, handle_path) direct_hit = match(direct_regexp, handle_path) - # If matched, thte create a watchdog event + # If matched, the create a watchdog event if direct_hit or recursive_hit: meow_event = create_event( - WATCHDOG_TYPE, - event.src_path, + EVENT_TYPE_WATCHDOG, + event.src_path, + rule, { WATCHDOG_BASE: self.base_dir, - WATCHDOG_RULE: rule, WATCHDOG_HASH: get_file_hash( event.src_path, SHA256 @@ -535,9 +535,10 @@ class WatchdogMonitor(BaseMonitor): for globble in globbed: meow_event = create_event( - WATCHDOG_TYPE, + EVENT_TYPE_WATCHDOG, globble, - { WATCHDOG_BASE: self.base_dir, WATCHDOG_RULE: rule } + rule, + { WATCHDOG_BASE: self.base_dir } ) print_debug(self._print_target, self.debug_level, f"Retroactive event for file at at {globble} hit rule " diff --git a/recipes/__init__.py b/recipes/__init__.py index abae3c0..9f07fe8 100644 --- a/recipes/__init__.py +++ b/recipes/__init__.py @@ -1,3 +1,4 @@ from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ PapermillHandler +from recipes.python_recipe import PythonRecipe, PythonHandler \ No newline at end of file diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index 4cf8c5e..6d7ff11 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -1,7 +1,7 @@ """ -This file contains definitions for a MEOW recipe based off of jupyter notebooks, -along with an appropriate handler for said events. +This file contains definitions for a MEOW recipe based off of jupyter +notebooks, along with an appropriate handler for said events. Author(s): David Marchant """ @@ -13,12 +13,13 @@ import sys from typing import Any from core.correctness.validation import check_type, valid_string, \ - valid_dict, valid_path, valid_existing_dir_path, setup_debugging + valid_dict, valid_path, valid_existing_dir_path, setup_debugging, \ + valid_event from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, PYTHON_FUNC, \ - DEBUG_INFO, WATCHDOG_TYPE, JOB_HASH, PYTHON_EXECUTION_BASE, \ - WATCHDOG_RULE, EVENT_PATH, PYTHON_TYPE, WATCHDOG_HASH, JOB_PARAMETERS, \ + DEBUG_INFO, EVENT_TYPE_WATCHDOG, JOB_HASH, PYTHON_EXECUTION_BASE, \ + EVENT_PATH, JOB_TYPE_PYTHON, WATCHDOG_HASH, JOB_PARAMETERS, \ PYTHON_OUTPUT_DIR, JOB_ID, WATCHDOG_BASE, META_FILE, BASE_FILE, \ - PARAMS_FILE, JOB_STATUS, STATUS_QUEUED + PARAMS_FILE, JOB_STATUS, STATUS_QUEUED, EVENT_RULE, EVENT_TYPE, EVENT_RULE from core.functionality import print_debug, create_job, replace_keywords, \ make_dir, write_yaml, write_notebook from core.meow import BaseRecipe, BaseHandler @@ -91,7 +92,7 @@ class PapermillHandler(BaseHandler): print_debug(self._print_target, self.debug_level, f"Handling event {event[EVENT_PATH]}", DEBUG_INFO) - rule = event[WATCHDOG_RULE] + rule = event[EVENT_RULE] # Assemble job parameters dict from pattern variables yaml_dict = {} @@ -122,10 +123,19 @@ class PapermillHandler(BaseHandler): yaml_dict[value[0]] = value[1] self.setup_job(event, yaml_dict) - def valid_event_types(self)->list[str]: - """Function to provide a list of the types of events this handler can - process.""" - return [WATCHDOG_TYPE] + def valid_handle_criteria(self, event:dict[str,Any])->bool: + """Function to determine given an event defintion, if this handler can + process it or not. This handler accepts events from watchdog with + jupyter notebook recipes.""" + try: + valid_event(event) + if type(event[EVENT_RULE].recipe) == JupyterNotebookRecipe \ + and event[EVENT_TYPE] == EVENT_TYPE_WATCHDOG: + return True + except: + pass + return False + def _is_valid_handler_base(self, handler_base)->None: """Validation check for 'handler_base' variable from main @@ -141,7 +151,7 @@ class PapermillHandler(BaseHandler): """Function to set up new job dict and send it to the runner to be executed.""" meow_job = create_job( - PYTHON_TYPE, + JOB_TYPE_PYTHON, event, { JOB_PARAMETERS:yaml_dict, @@ -153,7 +163,7 @@ class PapermillHandler(BaseHandler): ) print_debug(self._print_target, self.debug_level, f"Creating job from event at {event[EVENT_PATH]} of type " - f"{PYTHON_TYPE}.", DEBUG_INFO) + f"{JOB_TYPE_PYTHON}.", DEBUG_INFO) # replace MEOW keyworks within variables dict yaml_dict = replace_keywords( @@ -174,7 +184,7 @@ class PapermillHandler(BaseHandler): # write an executable notebook to the job directory base_file = os.path.join(job_dir, BASE_FILE) - write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file) + write_notebook(event[EVENT_RULE].recipe.recipe, base_file) # write a parameter file to the job directory param_file = os.path.join(job_dir, PARAMS_FILE) @@ -196,7 +206,7 @@ def job_func(job): from datetime import datetime from core.functionality import write_yaml, read_yaml, write_notebook, \ get_file_hash, parameterize_jupyter_notebook - from core.correctness.vars import JOB_EVENT, WATCHDOG_RULE, JOB_ID, \ + from core.correctness.vars import JOB_EVENT, EVENT_RULE, JOB_ID, \ EVENT_PATH, META_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE, \ JOB_STATUS, JOB_HASH, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ JOB_ERROR, STATUS_FAILED, PYTHON_EXECUTION_BASE @@ -235,7 +245,7 @@ def job_func(job): # Create a parameterised version of the executable notebook try: job_notebook = parameterize_jupyter_notebook( - event[WATCHDOG_RULE].recipe.recipe, yaml_dict + event[EVENT_RULE].recipe.recipe, yaml_dict ) write_notebook(job_notebook, job_file) except Exception as e: diff --git a/recipes/python_recipe.py b/recipes/python_recipe.py new file mode 100644 index 0000000..df71cb4 --- /dev/null +++ b/recipes/python_recipe.py @@ -0,0 +1,258 @@ + +""" +This file contains definitions for a MEOW recipe based off of python code, +along with an appropriate handler for said events. + +Author(s): David Marchant +""" +import os +import itertools +import nbformat +import sys + +from typing import Any + +from core.correctness.validation import check_type, valid_string, \ + valid_dict, valid_event, valid_existing_dir_path, setup_debugging +from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, PYTHON_FUNC, \ + DEBUG_INFO, EVENT_TYPE_WATCHDOG, JOB_HASH, PYTHON_EXECUTION_BASE, \ + EVENT_RULE, EVENT_PATH, JOB_TYPE_PYTHON, WATCHDOG_HASH, JOB_PARAMETERS, \ + PYTHON_OUTPUT_DIR, JOB_ID, WATCHDOG_BASE, META_FILE, BASE_FILE, \ + PARAMS_FILE, JOB_STATUS, STATUS_QUEUED, EVENT_TYPE, EVENT_RULE +from core.functionality import print_debug, create_job, replace_keywords, \ + make_dir, write_yaml, write_notebook +from core.meow import BaseRecipe, BaseHandler +from patterns.file_event_pattern import SWEEP_START, SWEEP_STOP, SWEEP_JUMP + + +class PythonRecipe(BaseRecipe): + def __init__(self, name:str, recipe:Any, parameters:dict[str,Any]={}, + requirements:dict[str,Any]={}): + """PythonRecipe Constructor. This is used to execute python analysis + code.""" + super().__init__(name, recipe, parameters, requirements) + + def _is_valid_recipe(self, recipe:dict[str,Any])->None: + """Validation check for 'recipe' variable from main constructor. + Called within parent BaseRecipe constructor.""" + check_type(recipe, dict) + nbformat.validate(recipe) + + def _is_valid_parameters(self, parameters:dict[str,Any])->None: + """Validation check for 'parameters' variable from main constructor. + Called within parent BaseRecipe constructor.""" + valid_dict(parameters, str, Any, strict=False, min_length=0) + for k in parameters.keys(): + valid_string(k, VALID_VARIABLE_NAME_CHARS) + + def _is_valid_requirements(self, requirements:dict[str,Any])->None: + """Validation check for 'requirements' variable from main constructor. + Called within parent BaseRecipe constructor.""" + valid_dict(requirements, str, Any, strict=False, min_length=0) + for k in requirements.keys(): + valid_string(k, VALID_VARIABLE_NAME_CHARS) + +class PythonHandler(BaseHandler): + # TODO move me to base handler + # handler directory to setup jobs in + handler_base:str + # TODO move me to conductor? + # Final location for job output to be placed + output_dir:str + # Config option, above which debug messages are ignored + debug_level:int + # Where print messages are sent + _print_target:Any + def __init__(self, handler_base:str, output_dir:str, print:Any=sys.stdout, + logging:int=0)->None: + """PythonHandler Constructor. This creates jobs to be executed as + python functions. This does not run as a continuous thread to + handle execution, but is invoked according to a factory pattern using + the handle function.""" + super().__init__() + self._is_valid_handler_base(handler_base) + self.handler_base = handler_base + self._is_valid_output_dir(output_dir) + self.output_dir = output_dir + self._print_target, self.debug_level = setup_debugging(print, logging) + print_debug(self._print_target, self.debug_level, + "Created new PythonHandler instance", DEBUG_INFO) + + def handle(self, event:dict[str,Any])->None: + """Function called to handle a given event.""" + print_debug(self._print_target, self.debug_level, + f"Handling event {event[EVENT_PATH]}", DEBUG_INFO) + + rule = event[EVENT_RULE] + + # Assemble job parameters dict from pattern variables + yaml_dict = {} + for var, val in rule.pattern.parameters.items(): + yaml_dict[var] = val + for var, val in rule.pattern.outputs.items(): + yaml_dict[var] = val + yaml_dict[rule.pattern.triggering_file] = event[EVENT_PATH] + + # If no parameter sweeps, then one job will suffice + if not rule.pattern.sweep: + self.setup_job(event, yaml_dict) + else: + # If parameter sweeps, then many jobs created + values_dict = {} + for var, val in rule.pattern.sweep.items(): + values_dict[var] = [] + par_val = val[SWEEP_START] + while par_val <= val[SWEEP_STOP]: + values_dict[var].append((var, par_val)) + par_val += val[SWEEP_JUMP] + + # combine all combinations of sweep values + values_list = list(itertools.product( + *[v for v in values_dict.values()])) + for values in values_list: + for value in values: + yaml_dict[value[0]] = value[1] + self.setup_job(event, yaml_dict) + + def valid_handle_criteria(self, event:dict[str,Any])->bool: + """Function to determine given an event defintion, if this handler can + process it or not. This handler accepts events from watchdog with + Python recipes""" + try: + valid_event(event) + if event[EVENT_TYPE] == EVENT_TYPE_WATCHDOG \ + and type(event[EVENT_RULE].recipe) == PythonRecipe: + return True + except: + pass + return False + + def _is_valid_handler_base(self, handler_base)->None: + """Validation check for 'handler_base' variable from main + constructor.""" + valid_existing_dir_path(handler_base) + + def _is_valid_output_dir(self, output_dir)->None: + """Validation check for 'output_dir' variable from main + constructor.""" + valid_existing_dir_path(output_dir, allow_base=True) + + def setup_job(self, event:dict[str,Any], yaml_dict:dict[str,Any])->None: + """Function to set up new job dict and send it to the runner to be + executed.""" + meow_job = create_job( + JOB_TYPE_PYTHON, + event, + { + JOB_PARAMETERS:yaml_dict, + JOB_HASH: event[WATCHDOG_HASH], + PYTHON_FUNC:job_func, + PYTHON_OUTPUT_DIR:self.output_dir, + PYTHON_EXECUTION_BASE:self.handler_base + } + ) + print_debug(self._print_target, self.debug_level, + f"Creating job from event at {event[EVENT_PATH]} of type " + f"{JOB_TYPE_PYTHON}.", DEBUG_INFO) + + # replace MEOW keyworks within variables dict + yaml_dict = replace_keywords( + meow_job[JOB_PARAMETERS], + meow_job[JOB_ID], + event[EVENT_PATH], + event[WATCHDOG_BASE] + ) + + # Create a base job directory + job_dir = os.path.join( + meow_job[PYTHON_EXECUTION_BASE], meow_job[JOB_ID]) + make_dir(job_dir) + + # write a status file to the job directory + meta_file = os.path.join(job_dir, META_FILE) + write_yaml(meow_job, meta_file) + + # write an executable notebook to the job directory + base_file = os.path.join(job_dir, BASE_FILE) + write_notebook(event[EVENT_RULE].recipe.recipe, base_file) + + # write a parameter file to the job directory + param_file = os.path.join(job_dir, PARAMS_FILE) + write_yaml(yaml_dict, param_file) + + meow_job[JOB_STATUS] = STATUS_QUEUED + + # update the status file with queued status + write_yaml(meow_job, meta_file) + + # Send job directory, as actual definitons will be read from within it + self.to_runner.send(job_dir) + +# Papermill job execution code, to be run within the conductor +def job_func(job): + # Requires own imports as will be run in its own execution environment + import os + import papermill + from datetime import datetime + from core.functionality import write_yaml, read_yaml, write_notebook, \ + get_file_hash, parameterize_jupyter_notebook + from core.correctness.vars import JOB_EVENT, EVENT_RULE, JOB_ID, \ + EVENT_PATH, META_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE, \ + JOB_STATUS, JOB_HASH, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ + JOB_ERROR, STATUS_FAILED, PYTHON_EXECUTION_BASE + + event = job[JOB_EVENT] + + # Identify job files + job_dir = os.path.join(job[PYTHON_EXECUTION_BASE], job[JOB_ID]) + meta_file = os.path.join(job_dir, META_FILE) + job_file = os.path.join(job_dir, JOB_FILE) + result_file = os.path.join(job_dir, RESULT_FILE) + param_file = os.path.join(job_dir, PARAMS_FILE) + + yaml_dict = read_yaml(param_file) + + # Check the hash of the triggering file, if present. This addresses + # potential race condition as file could have been modified since + # triggering event + if JOB_HASH in job: + # get current hash + triggerfile_hash = get_file_hash(job[JOB_EVENT][EVENT_PATH], SHA256) + # If hash doesn't match, then abort the job. If its been modified, then + # another job will have been scheduled anyway. + if not triggerfile_hash \ + or triggerfile_hash != job[JOB_HASH]: + job[JOB_STATUS] = STATUS_SKIPPED + job[JOB_END_TIME] = datetime.now() + msg = "Job was skipped as triggering file " + \ + f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \ + "scheduling. Was expected to have hash " + \ + f"'{job[JOB_HASH]}' but has '{triggerfile_hash}'." + job[JOB_ERROR] = msg + write_yaml(job, meta_file) + return + + # Create a parameterised version of the executable notebook + try: + job_notebook = parameterize_jupyter_notebook( + event[EVENT_RULE].recipe.recipe, yaml_dict + ) + write_notebook(job_notebook, job_file) + except Exception as e: + job[JOB_STATUS] = STATUS_FAILED + job[JOB_END_TIME] = datetime.now() + msg = f"Job file {job[JOB_ID]} was not created successfully. {e}" + job[JOB_ERROR] = msg + write_yaml(job, meta_file) + return + + # Execute the parameterised notebook + try: + papermill.execute_notebook(job_file, result_file, {}) + except Exception as e: + job[JOB_STATUS] = STATUS_FAILED + job[JOB_END_TIME] = datetime.now() + msg = f"Result file {result_file} was not created successfully. {e}" + job[JOB_ERROR] = msg + write_yaml(job, meta_file) + return diff --git a/tests/test_conductors.py b/tests/test_conductors.py index 55c5989..8e2ac2a 100644 --- a/tests/test_conductors.py +++ b/tests/test_conductors.py @@ -2,8 +2,8 @@ import os import unittest -from core.correctness.vars import PYTHON_TYPE, SHA256, WATCHDOG_TYPE, \ - WATCHDOG_BASE, WATCHDOG_RULE, WATCHDOG_HASH, JOB_PARAMETERS, JOB_HASH, \ +from core.correctness.vars import JOB_TYPE_PYTHON, SHA256, EVENT_TYPE_WATCHDOG, \ + WATCHDOG_BASE, EVENT_RULE, WATCHDOG_HASH, JOB_PARAMETERS, JOB_HASH, \ PYTHON_FUNC, PYTHON_OUTPUT_DIR, PYTHON_EXECUTION_BASE, JOB_ID, META_FILE, \ BASE_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE from core.functionality import get_file_hash, create_event, create_job, \ @@ -31,11 +31,9 @@ class MeowTests(unittest.TestCase): # Test LocalPythonConductor creation and job types def testLocalPythonConductorCreation(self)->None: - lpc = LocalPythonConductor() + LocalPythonConductor() - valid_jobs = lpc.valid_job_types() - - self.assertEqual(valid_jobs, [PYTHON_TYPE]) + #TODO Test LocalPythonConductor execution criteria # Test LocalPythonConductor executes valid jobs def testLocalPythonConductorValidJob(self)->None: @@ -70,13 +68,14 @@ class MeowTests(unittest.TestCase): } job_dict = create_job( - PYTHON_TYPE, + JOB_TYPE_PYTHON, create_event( - WATCHDOG_TYPE, + EVENT_TYPE_WATCHDOG, file_path, + rule, { WATCHDOG_BASE: TEST_MONITOR_BASE, - WATCHDOG_RULE: rule, + EVENT_RULE: rule, WATCHDOG_HASH: file_hash } ), @@ -146,13 +145,14 @@ class MeowTests(unittest.TestCase): } bad_job_dict = create_job( - PYTHON_TYPE, + JOB_TYPE_PYTHON, create_event( - WATCHDOG_TYPE, + EVENT_TYPE_WATCHDOG, file_path, + rule, { WATCHDOG_BASE: TEST_MONITOR_BASE, - WATCHDOG_RULE: rule, + EVENT_RULE: rule, WATCHDOG_HASH: file_hash } ), @@ -177,13 +177,14 @@ class MeowTests(unittest.TestCase): # Ensure execution can continue after one failed job good_job_dict = create_job( - PYTHON_TYPE, + JOB_TYPE_PYTHON, create_event( - WATCHDOG_TYPE, + EVENT_TYPE_WATCHDOG, file_path, + rule, { WATCHDOG_BASE: TEST_MONITOR_BASE, - WATCHDOG_RULE: rule, + EVENT_RULE: rule, WATCHDOG_HASH: file_hash } ), @@ -247,13 +248,14 @@ class MeowTests(unittest.TestCase): rule = create_rule(pattern, recipe) job_dict = create_job( - PYTHON_TYPE, + JOB_TYPE_PYTHON, create_event( - WATCHDOG_TYPE, + EVENT_TYPE_WATCHDOG, file_path, + rule, { WATCHDOG_BASE: TEST_MONITOR_BASE, - WATCHDOG_RULE: rule, + EVENT_RULE: rule, WATCHDOG_HASH: file_hash } ), diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 0e133d6..2b6ffee 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -8,8 +8,8 @@ from multiprocessing import Pipe, Queue from time import sleep from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ - SHA256, EVENT_TYPE, EVENT_PATH, WATCHDOG_TYPE, PYTHON_TYPE, \ - WATCHDOG_BASE, WATCHDOG_HASH, WATCHDOG_RULE, JOB_PARAMETERS, JOB_HASH, \ + SHA256, EVENT_TYPE, EVENT_PATH, EVENT_TYPE_WATCHDOG, JOB_TYPE_PYTHON, \ + WATCHDOG_BASE, WATCHDOG_HASH, EVENT_RULE, JOB_PARAMETERS, JOB_HASH, \ PYTHON_FUNC, PYTHON_OUTPUT_DIR, PYTHON_EXECUTION_BASE, JOB_ID, JOB_EVENT, \ JOB_TYPE, JOB_PATTERN, JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, \ JOB_REQUIREMENTS, STATUS_QUEUED @@ -241,21 +241,41 @@ class CorrectnessTests(unittest.TestCase): # Test that create_event produces valid event dictionary def testCreateEvent(self)->None: - event = create_event("test", "path") + pattern = FileEventPattern( + "pattern", + "file_path", + "recipe_one", + "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":"result_path" + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + event = create_event("test", "path", rule) self.assertEqual(type(event), dict) + self.assertEqual(len(event.keys()), 3) self.assertTrue(EVENT_TYPE in event.keys()) - self.assertEqual(len(event.keys()), 2) + self.assertTrue(EVENT_PATH in event.keys()) + self.assertTrue(EVENT_RULE in event.keys()) self.assertEqual(event[EVENT_TYPE], "test") self.assertEqual(event[EVENT_PATH], "path") + self.assertEqual(event[EVENT_RULE], rule) - event2 = create_event("test2", "path2", {"a":1}) + event2 = create_event("test2", "path2", rule, {"a":1}) self.assertEqual(type(event2), dict) self.assertTrue(EVENT_TYPE in event2.keys()) - self.assertEqual(len(event2.keys()), 3) + self.assertTrue(EVENT_PATH in event.keys()) + self.assertTrue(EVENT_RULE in event.keys()) + self.assertEqual(len(event2.keys()), 4) self.assertEqual(event2[EVENT_TYPE], "test2") self.assertEqual(event2[EVENT_PATH], "path2") + self.assertEqual(event2[EVENT_RULE], rule) self.assertEqual(event2["a"], 1) # Test that create_job produces valid job dictionary @@ -275,17 +295,18 @@ class CorrectnessTests(unittest.TestCase): rule = create_rule(pattern, recipe) event = create_event( - WATCHDOG_TYPE, + EVENT_TYPE_WATCHDOG, "file_path", + rule, { WATCHDOG_BASE: TEST_MONITOR_BASE, - WATCHDOG_RULE: rule, + EVENT_RULE: rule, WATCHDOG_HASH: "file_hash" } ) job_dict = create_job( - PYTHON_TYPE, + JOB_TYPE_PYTHON, event, { JOB_PARAMETERS:{ @@ -306,7 +327,7 @@ class CorrectnessTests(unittest.TestCase): self.assertIn(JOB_EVENT, job_dict) self.assertEqual(job_dict[JOB_EVENT], event) self.assertIn(JOB_TYPE, job_dict) - self.assertEqual(job_dict[JOB_TYPE], PYTHON_TYPE) + self.assertEqual(job_dict[JOB_TYPE], JOB_TYPE_PYTHON) self.assertIn(JOB_PATTERN, job_dict) self.assertEqual(job_dict[JOB_PATTERN], pattern.name) self.assertIn(JOB_RECIPE, job_dict) diff --git a/tests/test_meow.py b/tests/test_meow.py index 5ae809b..54a6663 100644 --- a/tests/test_meow.py +++ b/tests/test_meow.py @@ -198,7 +198,7 @@ class MeowTests(unittest.TestCase): pass def _is_valid_inputs(self, inputs:Any)->None: pass - def valid_event_types(self)->list[str]: + def valid_handle_criteria(self, event:dict[str,Any])->bool: pass FullTestHandler() @@ -218,7 +218,7 @@ class MeowTests(unittest.TestCase): def execute(self, job:dict[str,Any])->None: pass - def valid_job_types(self)->list[str]: + def valid_execute_criteria(self, job:dict[str,Any])->bool: pass FullTestConductor() diff --git a/tests/test_patterns.py b/tests/test_patterns.py index b4b8826..eee2b74 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -6,7 +6,7 @@ import unittest from multiprocessing import Pipe from core.correctness.vars import FILE_CREATE_EVENT, EVENT_TYPE, \ - WATCHDOG_RULE, WATCHDOG_BASE, WATCHDOG_TYPE, EVENT_PATH + EVENT_RULE, WATCHDOG_BASE, EVENT_TYPE_WATCHDOG, EVENT_PATH from core.functionality import make_dir from patterns.file_event_pattern import FileEventPattern, WatchdogMonitor, \ _DEFAULT_MASK, SWEEP_START, SWEEP_STOP, SWEEP_JUMP @@ -15,24 +15,24 @@ from shared import setup, teardown, BAREBONES_NOTEBOOK, TEST_MONITOR_BASE def patterns_equal(tester, pattern_one, pattern_two): - tester.assertEqual(pattern_one.name, pattern_two.name) - tester.assertEqual(pattern_one.recipe, pattern_two.recipe) - tester.assertEqual(pattern_one.parameters, pattern_two.parameters) - tester.assertEqual(pattern_one.outputs, pattern_two.outputs) - tester.assertEqual(pattern_one.triggering_path, - pattern_two.triggering_path) - tester.assertEqual(pattern_one.triggering_file, - pattern_two.triggering_file) - tester.assertEqual(pattern_one.event_mask, pattern_two.event_mask) - tester.assertEqual(pattern_one.sweep, pattern_two.sweep) + tester.assertEqual(pattern_one.name, pattern_two.name) + tester.assertEqual(pattern_one.recipe, pattern_two.recipe) + tester.assertEqual(pattern_one.parameters, pattern_two.parameters) + tester.assertEqual(pattern_one.outputs, pattern_two.outputs) + tester.assertEqual(pattern_one.triggering_path, + pattern_two.triggering_path) + tester.assertEqual(pattern_one.triggering_file, + pattern_two.triggering_file) + tester.assertEqual(pattern_one.event_mask, pattern_two.event_mask) + tester.assertEqual(pattern_one.sweep, pattern_two.sweep) def recipes_equal(tester, recipe_one, recipe_two): - tester.assertEqual(recipe_one.name, recipe_two.name) - tester.assertEqual(recipe_one.recipe, recipe_two.recipe) - tester.assertEqual(recipe_one.parameters, recipe_two.parameters) - tester.assertEqual(recipe_one.requirements, recipe_two.requirements) - tester.assertEqual(recipe_one.source, recipe_two.source) + tester.assertEqual(recipe_one.name, recipe_two.name) + tester.assertEqual(recipe_one.recipe, recipe_two.recipe) + tester.assertEqual(recipe_one.parameters, recipe_two.parameters) + tester.assertEqual(recipe_one.requirements, recipe_two.requirements) + tester.assertEqual(recipe_one.source, recipe_two.source) class CorrectnessTests(unittest.TestCase): @@ -225,12 +225,12 @@ class CorrectnessTests(unittest.TestCase): self.assertTrue(EVENT_TYPE in event.keys()) self.assertTrue(EVENT_PATH in event.keys()) self.assertTrue(WATCHDOG_BASE in event.keys()) - self.assertTrue(WATCHDOG_RULE in event.keys()) - self.assertEqual(event[EVENT_TYPE], WATCHDOG_TYPE) + self.assertTrue(EVENT_RULE in event.keys()) + self.assertEqual(event[EVENT_TYPE], EVENT_TYPE_WATCHDOG) self.assertEqual(event[EVENT_PATH], os.path.join(TEST_MONITOR_BASE, "A")) self.assertEqual(event[WATCHDOG_BASE], TEST_MONITOR_BASE) - self.assertEqual(event[WATCHDOG_RULE].name, rule.name) + self.assertEqual(event[EVENT_RULE].name, rule.name) open(os.path.join(TEST_MONITOR_BASE, "B"), "w") if from_monitor_reader.poll(3): @@ -289,14 +289,14 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual(type(message), dict) self.assertIn(EVENT_TYPE, message) - self.assertEqual(message[EVENT_TYPE], WATCHDOG_TYPE) + self.assertEqual(message[EVENT_TYPE], EVENT_TYPE_WATCHDOG) self.assertIn(WATCHDOG_BASE, message) self.assertEqual(message[WATCHDOG_BASE], TEST_MONITOR_BASE) self.assertIn(EVENT_PATH, message) self.assertEqual(message[EVENT_PATH], os.path.join(start_dir, "A.txt")) - self.assertIn(WATCHDOG_RULE, message) - self.assertEqual(message[WATCHDOG_RULE].name, rule.name) + self.assertIn(EVENT_RULE, message) + self.assertEqual(message[EVENT_RULE].name, rule.name) wm.stop() @@ -353,14 +353,14 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual(type(message), dict) self.assertIn(EVENT_TYPE, message) - self.assertEqual(message[EVENT_TYPE], WATCHDOG_TYPE) + self.assertEqual(message[EVENT_TYPE], EVENT_TYPE_WATCHDOG) self.assertIn(WATCHDOG_BASE, message) self.assertEqual(message[WATCHDOG_BASE], TEST_MONITOR_BASE) self.assertIn(EVENT_PATH, message) self.assertEqual(message[EVENT_PATH], os.path.join(start_dir, "A.txt")) - self.assertIn(WATCHDOG_RULE, message) - self.assertEqual(message[WATCHDOG_RULE].name, rule.name) + self.assertIn(EVENT_RULE, message) + self.assertEqual(message[EVENT_RULE].name, rule.name) wm.stop() diff --git a/tests/test_recipes.py b/tests/test_recipes.py index 454bdfc..a2d4eb1 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -5,11 +5,11 @@ import unittest from multiprocessing import Pipe -from core.correctness.vars import EVENT_TYPE, WATCHDOG_BASE, WATCHDOG_RULE, \ - WATCHDOG_TYPE, EVENT_PATH, SHA256, WATCHDOG_HASH, JOB_ID, PYTHON_TYPE, \ - JOB_PARAMETERS, JOB_HASH, PYTHON_FUNC, PYTHON_OUTPUT_DIR, \ - PYTHON_EXECUTION_BASE, META_FILE, BASE_FILE, PARAMS_FILE, JOB_FILE, \ - RESULT_FILE +from core.correctness.vars import EVENT_TYPE, WATCHDOG_BASE, EVENT_RULE, \ + EVENT_TYPE_WATCHDOG, EVENT_PATH, SHA256, WATCHDOG_HASH, JOB_ID, \ + JOB_TYPE_PYTHON, JOB_PARAMETERS, JOB_HASH, PYTHON_FUNC, \ + PYTHON_OUTPUT_DIR, PYTHON_EXECUTION_BASE, META_FILE, BASE_FILE, \ + PARAMS_FILE, JOB_FILE, RESULT_FILE from core.correctness.validation import valid_job from core.functionality import get_file_hash, create_job, create_event, \ make_dir, write_yaml, write_notebook, read_yaml @@ -22,7 +22,7 @@ from rules.file_event_jupyter_notebook_rule import FileEventJupyterNotebookRule from shared import setup, teardown, TEST_HANDLER_BASE, TEST_MONITOR_BASE, \ TEST_JOB_OUTPUT, BAREBONES_NOTEBOOK, APPENDING_NOTEBOOK, COMPLETE_NOTEBOOK -class CorrectnessTests(unittest.TestCase): +class JupyterNotebookTests(unittest.TestCase): def setUp(self)->None: super().setUp() setup() @@ -144,10 +144,10 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) event = { - EVENT_TYPE: WATCHDOG_TYPE, + EVENT_TYPE: EVENT_TYPE_WATCHDOG, EVENT_PATH: os.path.join(TEST_MONITOR_BASE, "A"), WATCHDOG_BASE: TEST_MONITOR_BASE, - WATCHDOG_RULE: rule, + EVENT_RULE: rule, WATCHDOG_HASH: get_file_hash( os.path.join(TEST_MONITOR_BASE, "A"), SHA256 ) @@ -198,10 +198,10 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) event = { - EVENT_TYPE: WATCHDOG_TYPE, + EVENT_TYPE: EVENT_TYPE_WATCHDOG, EVENT_PATH: os.path.join(TEST_MONITOR_BASE, "A"), WATCHDOG_BASE: TEST_MONITOR_BASE, - WATCHDOG_RULE: rule, + EVENT_RULE: rule, WATCHDOG_HASH: get_file_hash( os.path.join(TEST_MONITOR_BASE, "A"), SHA256 ) @@ -271,10 +271,10 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) event = { - EVENT_TYPE: WATCHDOG_TYPE, + EVENT_TYPE: EVENT_TYPE_WATCHDOG, EVENT_PATH: os.path.join(TEST_MONITOR_BASE, "A"), WATCHDOG_BASE: TEST_MONITOR_BASE, - WATCHDOG_RULE: rule, + EVENT_RULE: rule, WATCHDOG_HASH: get_file_hash( os.path.join(TEST_MONITOR_BASE, "A"), SHA256 ) @@ -350,13 +350,14 @@ class CorrectnessTests(unittest.TestCase): } job_dict = create_job( - PYTHON_TYPE, + JOB_TYPE_PYTHON, create_event( - WATCHDOG_TYPE, + EVENT_TYPE_WATCHDOG, file_path, + rule, { WATCHDOG_BASE: TEST_MONITOR_BASE, - WATCHDOG_RULE: rule, + EVENT_RULE: rule, WATCHDOG_HASH: file_hash } ), @@ -403,3 +404,15 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual(len(os.listdir(TEST_HANDLER_BASE)), 0) self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) + + #TODO Test handling criteria function + +# TODO implement me +class PythonTests(unittest.TestCase): + def setUp(self)->None: + super().setUp() + setup() + + def tearDown(self)->None: + super().tearDown() + teardown() diff --git a/tests/test_runner.py b/tests/test_runner.py index 29b809e..57b6620 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -58,20 +58,13 @@ class MeowTests(unittest.TestCase): self.assertIsNotNone(message) self.assertEqual(message, "monitor test message") - self.assertIsInstance(runner.handlers, dict) - for handler_list in runner.handlers.values(): - for h in handler_list: - self.assertIsInstance(h, BaseHandler) - self.assertEqual( - len(runner.handlers.keys()), len(handler_one.valid_event_types())) - for event_type in handler_one.valid_event_types(): - self.assertIn(event_type, runner.handlers.keys()) - self.assertEqual(len(runner.handlers[event_type]), 1) - self.assertEqual(runner.handlers[event_type][0], handler_one) + self.assertIsInstance(runner.handlers, list) + for handler in runner.handlers: + self.assertIsInstance(handler, BaseHandler) self.assertIsInstance(runner.from_handlers, list) self.assertEqual(len(runner.from_handlers), 1) - runner.handlers[handler_one.valid_event_types()[0]][0].to_runner.send( + runner.handlers[0].to_runner.send( "handler test message") message = None if runner.from_handlers[0].poll(3): @@ -79,16 +72,9 @@ class MeowTests(unittest.TestCase): self.assertIsNotNone(message) self.assertEqual(message, "handler test message") - self.assertIsInstance(runner.conductors, dict) - for conductor_list in runner.conductors.values(): - for c in conductor_list: - self.assertIsInstance(c, BaseConductor) - self.assertEqual( - len(runner.conductors.keys()), len(conductor_one.valid_job_types())) - for job_type in conductor_one.valid_job_types(): - self.assertIn(job_type, runner.conductors.keys()) - self.assertEqual(len(runner.conductors[job_type]), 1) - self.assertEqual(runner.conductors[job_type][0], conductor_one) + self.assertIsInstance(runner.conductors, list) + for conductor in runner.conductors: + self.assertIsInstance(conductor, BaseConductor) runner = MeowRunner(monitors, handlers, conductors) @@ -111,35 +97,13 @@ class MeowTests(unittest.TestCase): self.assertIsNotNone(m) self.assertEqual(m, "monitor test message") - self.assertIsInstance(runner.handlers, dict) - for handler_list in runner.handlers.values(): - for h in handler_list: - self.assertIsInstance(h, BaseHandler) - all_events = [] - for h in handlers: - for e in h.valid_event_types(): - if e not in all_events: - all_events.append(e) - self.assertEqual(len(runner.handlers.keys()), len(all_events)) - for handler in handlers: - for event_type in handler.valid_event_types(): - relevent_handlers = [h for h in handlers - if event_type in h.valid_event_types()] - self.assertIn(event_type, runner.handlers.keys()) - self.assertEqual(len(runner.handlers[event_type]), - len(relevent_handlers)) - for rh in relevent_handlers: - self.assertIn(rh, runner.handlers[event_type]) + self.assertIsInstance(runner.handlers, list) + for handler in runner.handlers: + self.assertIsInstance(handler, BaseHandler) self.assertIsInstance(runner.from_handlers, list) self.assertEqual(len(runner.from_handlers), len(handlers)) - runner_handlers = [] - for handler_list in runner.handlers.values(): - for h in handler_list: - runner_handlers.append(h) - runner_handlers = [h for h in handler_list for - handler_list in runner.handlers.values()] - for rh in handler_list: + for rh in runner.handlers: rh.to_runner.send("handler test message") message = None if runner.from_handlers[0].poll(3): @@ -147,25 +111,9 @@ class MeowTests(unittest.TestCase): self.assertIsNotNone(message) self.assertEqual(message, "handler test message") - self.assertIsInstance(runner.conductors, dict) - for conductor_list in runner.conductors.values(): - for c in conductor_list: - self.assertIsInstance(c, BaseConductor) - all_jobs = [] - for c in conductors: - for j in c.valid_job_types(): - if j not in all_jobs: - all_jobs.append(j) - self.assertEqual(len(runner.conductors.keys()), len(all_jobs)) - for conductor in conductors: - for job_type in conductor.valid_job_types(): - relevent_conductors = [c for c in conductors - if job_type in c.valid_job_types()] - self.assertIn(job_type, runner.conductors.keys()) - self.assertEqual(len(runner.conductors[job_type]), - len(relevent_conductors)) - for rc in relevent_conductors: - self.assertIn(rc, runner.conductors[job_type]) + self.assertIsInstance(runner.conductors, list) + for conductor in runner.conductors: + self.assertIsInstance(conductor, BaseConductor) # Test single meow job execution def testMeowRunnerExecution(self)->None: @@ -361,3 +309,9 @@ class MeowTests(unittest.TestCase): # TODO adding tests with numpy # TODO test getting job cannot handle # TODO test getting event cannot handle + # TODO test with several matched monitors + # TODO test with several mismatched monitors + # TODO test with several matched handlers + # TODO test with several mismatched handlers + # TODO test with several matched conductors + # TODO test with several mismatched conductors diff --git a/tests/test_validation.py b/tests/test_validation.py index c995dbf..5482ac1 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -12,7 +12,7 @@ from core.correctness.validation import check_type, check_implementation, \ setup_debugging from core.correctness.vars import VALID_NAME_CHARS, SHA256, EVENT_TYPE, \ EVENT_PATH, JOB_TYPE, JOB_EVENT, JOB_ID, JOB_PATTERN, JOB_RECIPE, \ - JOB_RULE, JOB_STATUS, JOB_CREATE_TIME + JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, EVENT_RULE from core.functionality import make_dir from shared import setup, teardown, TEST_MONITOR_BASE @@ -242,10 +242,20 @@ class CorrectnessTests(unittest.TestCase): with self.assertRaises(ValueError): valid_non_existing_path("first/second") + # TODO modify so tests for actual rule values # Test valid_event can check given event dictionary def testEventValidation(self)->None: - valid_event({EVENT_TYPE: "test", EVENT_PATH: "path"}) - valid_event({EVENT_TYPE: "anything", EVENT_PATH: "path", "a": 1}) + valid_event({ + EVENT_TYPE: "test", + EVENT_PATH: "path", + EVENT_RULE: "rule" + }) + valid_event({ + EVENT_TYPE: "anything", + EVENT_PATH: "path", + EVENT_RULE: "rule", + "a": 1 + }) with self.assertRaises(KeyError): valid_event({EVENT_TYPE: "test"}) @@ -292,3 +302,5 @@ class CorrectnessTests(unittest.TestCase): with self.assertRaises(TypeError): setup_debugging(stream, "1") + + #TODO test watchdog event dict