From b95042c5ca1b3fc2109217288f570c390905ea51 Mon Sep 17 00:00:00 2001 From: PatchOfScotland Date: Tue, 31 Jan 2023 14:36:38 +0100 Subject: [PATCH] added comments throughout --- conductors/local_python_conductor.py | 10 +- core/correctness/validation.py | 99 ++++++++----- core/correctness/vars.py | 7 + core/meow.py | 164 +++++++++++++++++++++- core/runner.py | 92 +++++++++++- patterns/file_event_pattern.py | 153 ++++++++++++++++++-- recipes/jupyter_notebook_recipe.py | 76 ++++++++-- rules/file_event_jupyter_notebook_rule.py | 12 ++ tests/test_all.sh | 2 + tests/test_meow.py | 2 +- 10 files changed, 545 insertions(+), 72 deletions(-) diff --git a/conductors/local_python_conductor.py b/conductors/local_python_conductor.py index 9d48b0d..9282093 100644 --- a/conductors/local_python_conductor.py +++ b/conductors/local_python_conductor.py @@ -1,4 +1,10 @@ +""" +This file contains definitions for the LocalPythonConductor, in order to +execute Python jobs on the local resource. + +Author(s): David Marchant +""" from typing import Any from core.correctness.vars import PYTHON_TYPE, PYTHON_FUNC @@ -18,8 +24,6 @@ class LocalPythonConductor(BaseConductor): valid_job(job) job_function = job[PYTHON_FUNC] - job_arguments = job - - job_function(job_arguments) + job_function(job) return diff --git a/core/correctness/validation.py b/core/correctness/validation.py index 0df0ed2..7eefb2e 100644 --- a/core/correctness/validation.py +++ b/core/correctness/validation.py @@ -1,4 +1,10 @@ +""" +This file contains various validation functions to be used throughout the +package. + +Author(s): David Marchant +""" from datetime import datetime from inspect import signature from os.path import sep, exists, isfile, isdir, dirname @@ -8,11 +14,13 @@ from core.correctness.vars import VALID_PATH_CHARS, get_not_imp_msg, \ EVENT_TYPE, EVENT_PATH, JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, \ JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME +# Required keys in event dict EVENT_KEYS = { EVENT_TYPE: str, EVENT_PATH: str } +# Required keys in job dict JOB_KEYS = { JOB_TYPE: str, JOB_EVENT: dict, @@ -26,28 +34,16 @@ JOB_KEYS = { def check_type(variable:Any, expected_type:type, alt_types:list[type]=[], or_none:bool=False)->None: - """ - Checks if a given variable is of the expected type. Raises TypeError or - ValueError as appropriate if any issues are encountered. - - :param variable: (any) variable to check type of - - :param expected_type: (type) expected type of the provided variable - - :param alt_types: (optional)(list) additional types that are also - acceptable - - :param or_none: (optional) boolean of if the variable can be unset. - Default value is False. - - :return: No return. - """ + """Checks if a given variable is of the expected type. Raises TypeError or + ValueError as appropriate if any issues are encountered.""" + # Get a list of all allowed types type_list = [expected_type] if get_origin(expected_type) is Union: type_list = list(get_args(expected_type)) type_list = type_list + alt_types + # Only accept None if explicitly allowed if variable is None: if or_none == False: raise TypeError( @@ -56,55 +52,53 @@ def check_type(variable:Any, expected_type:type, alt_types:list[type]=[], else: return + # If any type is allowed, then we can stop checking if expected_type == Any: return + # Check that variable type is within the accepted type list if not isinstance(variable, tuple(type_list)): - print("egh") - raise TypeError( 'Expected type(s) are %s, got %s' % (get_args(expected_type), type(variable)) ) def check_implementation(child_func, parent_class): + """Checks if the given function has been overridden from the one inherited + from the parent class. Raises a NotImplementedError if this is the case.""" + # Check parent first implements func to measure against if not hasattr(parent_class, child_func.__name__): raise AttributeError( f"Parent class {parent_class} does not implement base function " f"{child_func.__name__} for children to override.") parent_func = getattr(parent_class, child_func.__name__) + + # Check child implements function with correct name if (child_func == parent_func): msg = get_not_imp_msg(parent_class, parent_func) raise NotImplementedError(msg) + + # Check that child implements function with correct signature child_sig = signature(child_func).parameters parent_sig = signature(parent_func).parameters - if child_sig.keys() != parent_sig.keys(): msg = get_not_imp_msg(parent_class, parent_func) raise NotImplementedError(msg) def valid_string(variable:str, valid_chars:str, min_length:int=1)->None: - """ - Checks that all characters in a given string are present in a provided + """Checks that all characters in a given string are present in a provided list of characters. Will raise an ValueError if unexpected character is - encountered. - - :param variable: (str) variable to check. - - :param valid_chars: (str) collection of valid characters. - - :param min_length: (int) minimum length of variable. - - :return: No return. - """ + encountered.""" check_type(variable, str) check_type(valid_chars, str) + # Check string is long enough if len(variable) < min_length: raise ValueError ( f"String '{variable}' is too short. Minimum length is {min_length}" ) + # Check each char is acceptable for char in variable: if char not in valid_chars: raise ValueError( @@ -115,6 +109,10 @@ def valid_string(variable:str, valid_chars:str, min_length:int=1)->None: def valid_dict(variable:dict[Any, Any], key_type:type, value_type:type, required_keys:list[Any]=[], optional_keys:list[Any]=[], strict:bool=True, min_length:int=1)->None: + """Checks that a given dictionary is valid. Key and Value types are + enforced, as are required and optional keys. Will raise ValueError, + TypeError or KeyError depending on the problem encountered.""" + # Validate inputs check_type(variable, dict) check_type(key_type, type, alt_types=[_SpecialForm]) check_type(value_type, type, alt_types=[_SpecialForm]) @@ -122,10 +120,12 @@ def valid_dict(variable:dict[Any, Any], key_type:type, value_type:type, check_type(optional_keys, list) check_type(strict, bool) + # Check dict meets minimum length if len(variable) < min_length: raise ValueError(f"Dictionary '{variable}' is below minimum length of " f"{min_length}") + # Check key and value types for k, v in variable.items(): if key_type != Any and not isinstance(k, key_type): raise TypeError(f"Key {k} had unexpected type '{type(k)}' " @@ -134,11 +134,14 @@ def valid_dict(variable:dict[Any, Any], key_type:type, value_type:type, raise TypeError(f"Value {v} had unexpected type '{type(v)}' " f"rather than expected '{value_type}' in dict '{variable}'") + # Check all required keys present for rk in required_keys: if rk not in variable.keys(): raise KeyError(f"Missing required key '{rk}' from dict " f"'{variable}'") + # If strict checking, enforce that only required and optional keys are + # present if strict: for k in variable.keys(): if k not in required_keys and k not in optional_keys: @@ -147,50 +150,75 @@ def valid_dict(variable:dict[Any, Any], key_type:type, value_type:type, def valid_list(variable:list[Any], entry_type:type, alt_types:list[type]=[], min_length:int=1)->None: + """Checks that a given list is valid. Value types are checked and a + ValueError or TypeError is raised if a problem is encountered.""" check_type(variable, list) + + # Check length meets minimum if len(variable) < min_length: raise ValueError(f"List '{variable}' is too short. Should be at least " f"of length {min_length}") + + # Check type of each value for entry in variable: check_type(entry, entry_type, alt_types=alt_types) def valid_path(variable:str, allow_base:bool=False, extension:str="", min_length:int=1): + """Check that a given string expresses a valid path.""" valid_string(variable, VALID_PATH_CHARS, min_length=min_length) + + # Check we aren't given a root path if not allow_base and variable.startswith(sep): raise ValueError(f"Cannot accept path '{variable}'. Must be relative.") + + # Check path contains a valid extension if extension and not variable.endswith(extension): raise ValueError(f"Path '{variable}' does not have required " f"extension '{extension}'.") def valid_existing_file_path(variable:str, allow_base:bool=False, extension:str=""): + """Check the given string is a path to an existing file.""" + # Check that the string is a path valid_path(variable, allow_base=allow_base, extension=extension) + # Check the path exists if not exists(variable): raise FileNotFoundError( f"Requested file path '{variable}' does not exist.") + # Check it is a file if not isfile(variable): raise ValueError( f"Requested file '{variable}' is not a file.") def valid_existing_dir_path(variable:str, allow_base:bool=False): + """Check the given string is a path to an existing directory.""" + # Check that the string is a path valid_path(variable, allow_base=allow_base, extension="") + # Check the path exists if not exists(variable): raise FileNotFoundError( f"Requested dir path '{variable}' does not exist.") + # Check it is a directory if not isdir(variable): raise ValueError( f"Requested dir '{variable}' is not a directory.") def valid_non_existing_path(variable:str, allow_base:bool=False): + """Check the given string is a path to something that does not exist.""" + # Check that the string is a path valid_path(variable, allow_base=allow_base, extension="") + # Check the path does not exist if exists(variable): raise ValueError(f"Requested path '{variable}' already exists.") + # Check that any intermediate directories exist if dirname(variable) and not exists(dirname(variable)): raise ValueError( f"Route to requested path '{variable}' does not exist.") def setup_debugging(print:Any=None, logging:int=0)->Tuple[Any,int]: + """Create a place for debug messages to be sent. Always returns a place, + along with a logging level.""" check_type(logging, int) if print is None: return None, 0 @@ -204,15 +232,22 @@ def setup_debugging(print:Any=None, logging:int=0)->Tuple[Any,int]: return print, logging -def valid_meow_dict(meow_dict:dict[str,Any], msg:str, keys:dict[str,type])->None: +def valid_meow_dict(meow_dict:dict[str,Any], msg:str, + keys:dict[str,type])->None: + """Check given dictionary expresses a meow construct. This won't do much + directly, but is called by more specific validation functions.""" check_type(meow_dict, dict) + # Check we have all the required keys, and they are all of the expected + # type for key, value_type in keys.items(): if not key in meow_dict.keys(): raise KeyError(f"{msg} require key '{key}'") check_type(meow_dict[key], value_type) def valid_event(event:dict[str,Any])->None: + """Check that a given dict expresses a meow event.""" valid_meow_dict(event, "Event", EVENT_KEYS) def valid_job(job:dict[str,Any])->None: + """Check that a given dict expresses a meow job.""" valid_meow_dict(job, "Job", JOB_KEYS) diff --git a/core/correctness/vars.py b/core/correctness/vars.py index 647780b..52fb666 100644 --- a/core/correctness/vars.py +++ b/core/correctness/vars.py @@ -1,4 +1,11 @@ +""" +This file contains a variety of constants used throughout the package. +Constants specific to only one file should be stored there, and only shared +here. + +Author(s): David Marchant +""" import os from multiprocessing import Queue diff --git a/core/meow.py b/core/meow.py index a11c6cb..3f2edd1 100644 --- a/core/meow.py +++ b/core/meow.py @@ -1,4 +1,12 @@ +""" +This file contains the core MEOW defintions, used throughout this package. +It is intended that these base definitions are what should be inherited from in +order to create an extendable framework for event-based scheduling and +processing. + +Author(s): David Marchant +""" import inspect import sys @@ -14,12 +22,19 @@ from core.functionality import generate_id class BaseRecipe: + # A unique identifier for the recipe name:str + # Actual code to run recipe:Any + # Possible parameters that could be overridden by a Pattern parameters:dict[str, Any] + # Additional configuration options requirements:dict[str, Any] def __init__(self, name:str, recipe:Any, parameters:dict[str,Any]={}, requirements:dict[str,Any]={}): + """BaseRecipe Constructor. This will check that any class inheriting + from it implements its validation functions. It will then call these on + the input parameters.""" check_implementation(type(self)._is_valid_recipe, BaseRecipe) check_implementation(type(self)._is_valid_parameters, BaseRecipe) check_implementation(type(self)._is_valid_requirements, BaseRecipe) @@ -33,31 +48,49 @@ class BaseRecipe: self.requirements = requirements def __new__(cls, *args, **kwargs): + """A check that this base class is not instantiated itself, only + inherited from""" if cls is BaseRecipe: msg = get_drt_imp_msg(BaseRecipe) raise TypeError(msg) return object.__new__(cls) def _is_valid_name(self, name:str)->None: + """Validation check for 'name' variable from main constructor. Is + automatically called during initialisation. This does not need to be + overridden by child classes.""" valid_string(name, VALID_RECIPE_NAME_CHARS) def _is_valid_recipe(self, recipe:Any)->None: + """Validation check for 'recipe' variable from main constructor. Must + be implemented by any child class.""" pass def _is_valid_parameters(self, parameters:Any)->None: + """Validation check for 'parameters' variable from main constructor. + Must be implemented by any child class.""" pass def _is_valid_requirements(self, requirements:Any)->None: + """Validation check for 'requirements' variable from main constructor. + Must be implemented by any child class.""" pass class BasePattern: + # A unique identifier for the pattern name:str + # An identifier of a recipe recipe:str + # Parameters to be overridden in the recipe parameters:dict[str,Any] + # Parameters showing the potential outputs of a recipe outputs:dict[str,Any] def __init__(self, name:str, recipe:str, parameters:dict[str,Any]={}, outputs:dict[str,Any]={}): + """BasePattern Constructor. This will check that any class inheriting + from it implements its validation functions. It will then call these on + the input parameters.""" check_implementation(type(self)._is_valid_recipe, BasePattern) check_implementation(type(self)._is_valid_parameters, BasePattern) check_implementation(type(self)._is_valid_output, BasePattern) @@ -71,31 +104,50 @@ class BasePattern: self.outputs = outputs def __new__(cls, *args, **kwargs): + """A check that this base class is not instantiated itself, only + inherited from""" if cls is BasePattern: msg = get_drt_imp_msg(BasePattern) raise TypeError(msg) return object.__new__(cls) def _is_valid_name(self, name:str)->None: + """Validation check for 'name' variable from main constructor. Is + automatically called during initialisation. This does not need to be + overridden by child classes.""" valid_string(name, VALID_PATTERN_NAME_CHARS) def _is_valid_recipe(self, recipe:Any)->None: + """Validation check for 'recipe' variable from main constructor. Must + be implemented by any child class.""" pass def _is_valid_parameters(self, parameters:Any)->None: + """Validation check for 'parameters' variable from main constructor. + Must be implemented by any child class.""" pass def _is_valid_output(self, outputs:Any)->None: + """Validation check for 'outputs' variable from main constructor. Must + be implemented by any child class.""" pass class BaseRule: + # A unique identifier for the rule name:str + # A pattern to be used in rule triggering pattern:BasePattern + # A recipe to be used in rule execution recipe:BaseRecipe + # The string name of the pattern class that can be used to create this rule pattern_type:str="" + # The string name of the recipe class that can be used to create this rule recipe_type:str="" def __init__(self, name:str, pattern:BasePattern, recipe:BaseRecipe): + """BaseRule Constructor. This will check that any class inheriting + from it implements its validation functions. It will then call these on + the input parameters.""" check_implementation(type(self)._is_valid_pattern, BaseRule) check_implementation(type(self)._is_valid_recipe, BaseRule) self._is_valid_name(name) @@ -107,21 +159,32 @@ class BaseRule: self.__check_types_set() def __new__(cls, *args, **kwargs): + """A check that this base class is not instantiated itself, only + inherited from""" if cls is BaseRule: msg = get_drt_imp_msg(BaseRule) raise TypeError(msg) return object.__new__(cls) def _is_valid_name(self, name:str)->None: + """Validation check for 'name' variable from main constructor. Is + automatically called during initialisation. This does not need to be + overridden by child classes.""" valid_string(name, VALID_RULE_NAME_CHARS) def _is_valid_pattern(self, pattern:Any)->None: + """Validation check for 'pattern' variable from main constructor. Must + be implemented by any child class.""" pass def _is_valid_recipe(self, recipe:Any)->None: + """Validation check for 'recipe' variable from main constructor. Must + be implemented by any child class.""" pass def __check_types_set(self)->None: + """Validation check that the self.pattern_type and self.recipe_type + attributes have been set in a child class.""" if self.pattern_type == "": raise AttributeError(f"Rule Class '{self.__class__.__name__}' " "does not set a pattern_type.") @@ -131,11 +194,21 @@ class BaseRule: class BaseMonitor: + # A collection of patterns _patterns: dict[str, BasePattern] + # A collection of recipes _recipes: dict[str, BaseRecipe] + # A collection of rules derived from _patterns and _recipes _rules: dict[str, BaseRule] + # A channel for sending messages to the runner. Note that this is not + # initialised within the constructor, but within the runner when passed the + # monitor is passed to it. to_runner: VALID_CHANNELS - def __init__(self, patterns:dict[str,BasePattern], recipes:dict[str,BaseRecipe])->None: + def __init__(self, patterns:dict[str,BasePattern], + recipes:dict[str,BaseRecipe])->None: + """BaseMonitor Constructor. This will check that any class inheriting + from it implements its validation functions. It will then call these on + the input parameters.""" check_implementation(type(self).start, BaseMonitor) check_implementation(type(self).stop, BaseMonitor) check_implementation(type(self)._is_valid_patterns, BaseMonitor) @@ -151,98 +224,161 @@ class BaseMonitor: check_implementation(type(self).remove_recipe, BaseMonitor) check_implementation(type(self).get_recipes, BaseMonitor) check_implementation(type(self).get_rules, BaseMonitor) + # Ensure that patterns and recipes cannot be trivially modified from + # outside the monitor, as this will cause internal consistency issues self._patterns = deepcopy(patterns) self._recipes = deepcopy(recipes) self._rules = create_rules(patterns, recipes) def __new__(cls, *args, **kwargs): + """A check that this base class is not instantiated itself, only + inherited from""" if cls is BaseMonitor: msg = get_drt_imp_msg(BaseMonitor) raise TypeError(msg) return object.__new__(cls) def _is_valid_patterns(self, patterns:dict[str,BasePattern])->None: + """Validation check for 'patterns' variable from main constructor. Must + be implemented by any child class.""" pass def _is_valid_recipes(self, recipes:dict[str,BaseRecipe])->None: + """Validation check for 'recipes' variable from main constructor. Must + be implemented by any child class.""" pass def start(self)->None: + """Function to start the monitor as an ongoing process/thread. Must be + implemented by any child process""" pass def stop(self)->None: + """Function to stop the monitor as an ongoing process/thread. Must be + implemented by any child process""" pass def add_pattern(self, pattern:BasePattern)->None: + """Function to add a pattern to the current definitions. Must be + implemented by any child process.""" pass def update_pattern(self, pattern:BasePattern)->None: + """Function to update a pattern in the current definitions. Must be + implemented by any child process.""" pass def remove_pattern(self, pattern:Union[str,BasePattern])->None: + """Function to remove a pattern from the current definitions. Must be + implemented by any child process.""" pass - def get_patterns(self)->None: + def get_patterns(self)->dict[str,BasePattern]: + """Function to get a dictionary of all current pattern definitions. + Must be implemented by any child process.""" pass def add_recipe(self, recipe:BaseRecipe)->None: + """Function to add a recipe to the current definitions. Must be + implemented by any child process.""" pass def update_recipe(self, recipe:BaseRecipe)->None: + """Function to update a recipe in the current definitions. Must be + implemented by any child process.""" pass def remove_recipe(self, recipe:Union[str,BaseRecipe])->None: + """Function to remove a recipe from the current definitions. Must be + implemented by any child process.""" pass - def get_recipes(self)->None: + def get_recipes(self)->dict[str,BaseRecipe]: + """Function to get a dictionary of all current recipe definitions. + Must be implemented by any child process.""" pass - def get_rules(self)->None: + def get_rules(self)->dict[str,BaseRule]: + """Function to get a dictionary of all current rule definitions. + Must be implemented by any child process.""" pass class BaseHandler: + # A channel for sending messages to the runner. Note that this is not + # initialised within the constructor, but within the runner when passed the + # handler is passed to it. to_runner: VALID_CHANNELS def __init__(self)->None: + """BaseHandler Constructor. This will check that any class inheriting + from it implements its validation functions.""" check_implementation(type(self).handle, BaseHandler) check_implementation(type(self).valid_event_types, BaseHandler) def __new__(cls, *args, **kwargs): + """A check that this base class is not instantiated itself, only + inherited from""" if cls is BaseHandler: msg = get_drt_imp_msg(BaseHandler) raise TypeError(msg) return object.__new__(cls) def valid_event_types(self)->list[str]: + """Function to provide a list of the types of events this handler can + process. Must be implemented by any child process.""" pass def handle(self, event:dict[str,Any])->None: + """Function to handle a given event. Must be implemented by any child + process.""" pass class BaseConductor: def __init__(self)->None: + """BaseConductor Constructor. This will check that any class inheriting + from it implements its validation functions.""" check_implementation(type(self).execute, BaseConductor) check_implementation(type(self).valid_job_types, BaseConductor) + def __new__(cls, *args, **kwargs): + """A check that this base class is not instantiated itself, only + inherited from""" + if cls is BaseConductor: + msg = get_drt_imp_msg(BaseConductor) + raise TypeError(msg) + return object.__new__(cls) + def valid_job_types(self)->list[str]: + """Function to provide a list of the types of jobs this conductor can + process. Must be implemented by any child process.""" pass def execute(self, job:dict[str,Any])->None: + """Function to execute a given job. Must be implemented by any child + process.""" pass def create_rules(patterns:Union[dict[str,BasePattern],list[BasePattern]], recipes:Union[dict[str,BaseRecipe],list[BaseRecipe]], new_rules:list[BaseRule]=[])->dict[str,BaseRule]: + """Function to create any valid rules from a given collection of patterns + and recipes. All inbuilt rule types are considered, with additional + definitions provided through the 'new_rules' variable. Note that any + provided pattern and recipe dictionaries must be keyed with the + corresponding pattern and recipe names.""" + # Validation of inputs check_type(patterns, dict, alt_types=[list]) check_type(recipes, dict, alt_types=[list]) valid_list(new_rules, BaseRule, min_length=0) + # Convert a pattern list to a dictionary if isinstance(patterns, list): valid_list(patterns, BasePattern, min_length=0) patterns = {pattern.name:pattern for pattern in patterns} else: + # Validate the pattern dictionary valid_dict(patterns, str, BasePattern, strict=False, min_length=0) for k, v in patterns.items(): if k != v.name: @@ -251,10 +387,12 @@ def create_rules(patterns:Union[dict[str,BasePattern],list[BasePattern]], "Pattern dictionaries must be keyed with the name of the " "Pattern.") + # Convert a recipe list into a dictionary if isinstance(recipes, list): valid_list(recipes, BaseRecipe, min_length=0) recipes = {recipe.name:recipe for recipe in recipes} else: + # Validate the recipe dictionary valid_dict(recipes, str, BaseRecipe, strict=False, min_length=0) for k, v in recipes.items(): if k != v.name: @@ -263,25 +401,38 @@ def create_rules(patterns:Union[dict[str,BasePattern],list[BasePattern]], "Recipe dictionaries must be keyed with the name of the " "Recipe.") + # Try to create a rule for each rule in turn generated_rules = {} for pattern in patterns.values(): if pattern.recipe in recipes: - rule = create_rule(pattern, recipes[pattern.recipe]) - generated_rules[rule.name] = rule + try: + rule = create_rule(pattern, recipes[pattern.recipe]) + generated_rules[rule.name] = rule + except TypeError: + pass return generated_rules def create_rule(pattern:BasePattern, recipe:BaseRecipe, new_rules:list[BaseRule]=[])->BaseRule: + """Function to create a valid rule from a given pattern and recipe. All + inbuilt rule types are considered, with additional definitions provided + through the 'new_rules' variable.""" check_type(pattern, BasePattern) check_type(recipe, BaseRecipe) valid_list(new_rules, BaseRule, min_length=0) # Imported here to avoid circular imports at top of file import rules + # Get a dictionary of all inbuilt rules all_rules ={(r.pattern_type, r.recipe_type):r for r in [r[1] \ for r in inspect.getmembers(sys.modules["rules"], inspect.isclass) \ if (issubclass(r[1], BaseRule))]} + # Add in new rules + for rule in new_rules: + all_rules[(rule.pattern_type, rule.recipe_type)] = rule + + # Find appropriate rule type from pattern and recipe types key = (type(pattern).__name__, type(recipe).__name__) if (key) in all_rules: return all_rules[key]( @@ -289,5 +440,6 @@ def create_rule(pattern:BasePattern, recipe:BaseRecipe, pattern, recipe ) + # Raise error if not valid rule type can be found raise TypeError(f"No valid rule for Pattern '{pattern}' and Recipe " f"'{recipe}' could be found.") diff --git a/core/runner.py b/core/runner.py index 0815997..8579ccb 100644 --- a/core/runner.py +++ b/core/runner.py @@ -1,4 +1,11 @@ +""" +This file contains the defintion for the MeowRunner, the main construct used +for actually orchestration MEOW analysis. It is intended as a modular system, +with monitors, handlers, and conductors being swappable at initialisation. + +Author(s): David Marchant +""" import sys import threading @@ -16,20 +23,31 @@ from core.meow import BaseHandler, BaseMonitor, BaseConductor class MeowRunner: + # A collection of all monitors in the runner monitors:list[BaseMonitor] + # A collection of all handlers in the runner handlers:dict[str:BaseHandler] + # A collection of all conductors in the runner conductors:dict[str:BaseConductor] + # A collection of all channels from each monitor from_monitors: list[VALID_CHANNELS] + # A collection of all channels from each handler from_handlers: list[VALID_CHANNELS] def __init__(self, monitors:Union[BaseMonitor,list[BaseMonitor]], handlers:Union[BaseHandler,list[BaseHandler]], conductors:Union[BaseConductor,list[BaseConductor]], print:Any=sys.stdout, logging:int=0)->None: + """MeowRunner constructor. This connects all provided monitors, + handlers and conductors according to what events and jobs they produce + or consume.""" self._is_valid_conductors(conductors) + # If conductors isn't a list, make it one if not type(conductors) == list: conductors = [conductors] self.conductors = {} + # Create a dictionary of conductors, keyed by job type, and valued by a + # list of conductors for that job type for conductor in conductors: conductor_jobs = conductor.valid_job_types() if not conductor_jobs: @@ -45,10 +63,13 @@ class MeowRunner: self.conductors[job] = [conductor] self._is_valid_handlers(handlers) + # If handlers isn't a list, make it one if not type(handlers) == list: handlers = [handlers] self.handlers = {} self.from_handlers = [] + # Create a dictionary of handlers, keyed by event type, and valued by a + # list of handlers for that event type for handler in handlers: handler_events = handler.valid_event_types() if not handler_events: @@ -62,80 +83,121 @@ class MeowRunner: self.handlers[event].append(handler) else: self.handlers[event] = [handler] + # Create a channel from the handler back to this runner handler_to_runner_reader, handler_to_runner_writer = Pipe() handler.to_runner = handler_to_runner_writer self.from_handlers.append(handler_to_runner_reader) self._is_valid_monitors(monitors) + # If monitors isn't a list, make it one if not type(monitors) == list: monitors = [monitors] self.monitors = monitors self.from_monitors = [] for monitor in self.monitors: + # Create a channel from the monitor back to this runner monitor_to_runner_reader, monitor_to_runner_writer = Pipe() monitor.to_runner = monitor_to_runner_writer self.from_monitors.append(monitor_to_runner_reader) + # Create channel to send stop messages to monitor/handler thread self._stop_mon_han_pipe = Pipe() self._mon_han_worker = None + # Create channel to send stop messages to handler/conductor thread self._stop_han_con_pipe = Pipe() self._han_con_worker = None + # Setup debugging self._print_target, self.debug_level = setup_debugging(print, logging) def run_monitor_handler_interaction(self)->None: + """Function to be run in its own thread, to handle any inbound messages + from monitors. These will be events, which should be matched to an + appropriate handler and handled.""" all_inputs = self.from_monitors + [self._stop_mon_han_pipe[0]] while True: ready = wait(all_inputs) + # If we get a message from the stop channel, then finish if self._stop_mon_han_pipe[0] in ready: return else: for from_monitor in self.from_monitors: if from_monitor in ready: + # Read event from the monitor channel message = from_monitor.recv() event = message + # Abort if we don't have a relevent handler. if not self.handlers[event[EVENT_TYPE]]: print_debug(self._print_target, self.debug_level, "Could not process event as no relevent " f"handler for '{event[EVENT_TYPE]}'", DEBUG_INFO) return + # If we've only one handler, use that if len(self.handlers[event[EVENT_TYPE]]) == 1: - self.handlers[event[EVENT_TYPE]][0].handle(event) + handler = self.handlers[event[EVENT_TYPE]][0] + self.handle_event(handler, event) + # If multiple handlers then randomly pick one else: - self.handlers[event[EVENT_TYPE]][ + handler = self.handlers[event[EVENT_TYPE]][ randrange(len(self.handlers[event[EVENT_TYPE]])) - ].handle(event) + ] + self.handle_event(handler, event) def run_handler_conductor_interaction(self)->None: + """Function to be run in its own thread, to handle any inbound messages + from handlers. These will be jobs, which should be matched to an + appropriate conductor and executed.""" all_inputs = self.from_handlers + [self._stop_han_con_pipe[0]] while True: ready = wait(all_inputs) + # If we get a message from the stop channel, then finish if self._stop_han_con_pipe[0] in ready: return else: for from_handler in self.from_handlers: if from_handler in ready: + # Read event from the handler channel message = from_handler.recv() job = message + # Abort if we don't have a relevent conductor. if not self.conductors[job[JOB_TYPE]]: print_debug(self._print_target, self.debug_level, "Could not process job as no relevent " f"conductor for '{job[JOB_TYPE]}'", DEBUG_INFO) return + # If we've only one conductor, use that if len(self.conductors[job[JOB_TYPE]]) == 1: conductor = self.conductors[job[JOB_TYPE]][0] self.execute_job(conductor, job) + # If multiple conductors then randomly pick one else: conductor = self.conductors[job[JOB_TYPE]][ randrange(len(self.conductors[job[JOB_TYPE]])) ] self.execute_job(conductor, job) + + def handle_event(self, handler:BaseHandler, event:dict[str:Any])->None: + """Function for a given handler to handle a given event, without + crashing the runner in the event of a problem.""" + print_debug(self._print_target, self.debug_level, + f"Starting handling for event: '{event[EVENT_TYPE]}'", DEBUG_INFO) + try: + handler.handle(event) + print_debug(self._print_target, self.debug_level, + f"Completed handling for event: '{event[EVENT_TYPE]}'", + DEBUG_INFO) + except Exception as e: + print_debug(self._print_target, self.debug_level, + "Something went wrong during handling for event " + f"'{event[EVENT_TYPE]}'. {e}", DEBUG_INFO) def execute_job(self, conductor:BaseConductor, job:dict[str:Any])->None: + """Function for a given conductor to execute a given job, without + crashing the runner in the event of a problem.""" print_debug(self._print_target, self.debug_level, f"Starting execution for job: '{job[JOB_ID]}'", DEBUG_INFO) try: @@ -148,20 +210,28 @@ class MeowRunner: f"'{job[JOB_ID]}'. {e}", DEBUG_INFO) def start(self)->None: + """Function to start the runner by starting all of the constituent + monitors, handlers and conductors, along with managing interaction + threads.""" + # Start all monitors for monitor in self.monitors: monitor.start() startable = [] + # Start all handlers, if they need it for handler_list in self.handlers.values(): for handler in handler_list: if hasattr(handler, "start") and handler not in startable: startable.append() + # Start all conductors, if they need it for conductor_list in self.conductors.values(): for conductor in conductor_list: if hasattr(conductor, "start") and conductor not in startable: startable.append() for starting in startable: starting.start() - + + # If we've not started the monitor/handler interaction thread yet, then + # do so if self._mon_han_worker is None: self._mon_han_worker = threading.Thread( target=self.run_monitor_handler_interaction, @@ -177,6 +247,8 @@ class MeowRunner: msg, DEBUG_WARNING) raise RuntimeWarning(msg) + # If we've not started the handler/conductor interaction thread yet, + # then do so if self._han_con_worker is None: self._han_con_worker = threading.Thread( target=self.run_handler_conductor_interaction, @@ -193,14 +265,20 @@ class MeowRunner: raise RuntimeWarning(msg) def stop(self)->None: + """Function to stop the runner by stopping all of the constituent + monitors, handlers and conductors, along with managing interaction + threads.""" + # Stop all the monitors for monitor in self.monitors: monitor.stop() stopable = [] + # Stop all handlers, if they need it for handler_list in self.handlers.values(): for handler in handler_list: if hasattr(handler, "stop") and handler not in stopable: stopable.append() + # Stop all conductors, if they need it for conductor_list in self.conductors.values(): for conductor in conductor_list: if hasattr(conductor, "stop") and conductor not in stopable: @@ -208,6 +286,7 @@ class MeowRunner: for stopping in stopable: stopping.stop() + # If we've started the monitor/handler interaction thread, then stop it if self._mon_han_worker is None: msg = "Cannot stop event handling thread that is not started." print_debug(self._print_target, self.debug_level, @@ -219,6 +298,8 @@ class MeowRunner: print_debug(self._print_target, self.debug_level, "Event handler thread stopped", DEBUG_INFO) + # If we've started the handler/conductor interaction thread, then stop + # it if self._han_con_worker is None: msg = "Cannot stop job conducting thread that is not started." print_debug(self._print_target, self.debug_level, @@ -232,18 +313,21 @@ class MeowRunner: def _is_valid_monitors(self, monitors:Union[BaseMonitor,list[BaseMonitor]])->None: + """Validation check for 'monitors' variable from main constructor.""" check_type(monitors, BaseMonitor, alt_types=[list]) if type(monitors) == list: valid_list(monitors, BaseMonitor, min_length=1) def _is_valid_handlers(self, handlers:Union[BaseHandler,list[BaseHandler]])->None: + """Validation check for 'handlers' variable from main constructor.""" check_type(handlers, BaseHandler, alt_types=[list]) if type(handlers) == list: valid_list(handlers, BaseHandler, min_length=1) def _is_valid_conductors(self, conductors:Union[BaseConductor,list[BaseConductor]])->None: + """Validation check for 'conductors' variable from main constructor.""" check_type(conductors, BaseConductor, alt_types=[list]) if type(conductors) == list: valid_list(conductors, BaseConductor, min_length=1) diff --git a/patterns/file_event_pattern.py b/patterns/file_event_pattern.py index 26e2d05..9c2a3da 100644 --- a/patterns/file_event_pattern.py +++ b/patterns/file_event_pattern.py @@ -1,4 +1,10 @@ +""" +This file contains definitions for a MEOW pattern based off of file events, +along with an appropriate monitor for said events. + +Author(s): David Marchant +""" import glob import threading import sys @@ -23,6 +29,7 @@ from core.functionality import print_debug, create_event, get_file_hash from core.meow import BasePattern, BaseMonitor, BaseRule, BaseRecipe, \ create_rule +# Events that are monitored by default _DEFAULT_MASK = [ FILE_CREATE_EVENT, FILE_MODIFY_EVENT, @@ -30,20 +37,28 @@ _DEFAULT_MASK = [ FILE_RETROACTIVE_EVENT ] +# Parameter sweep keys SWEEP_START = "start" SWEEP_STOP = "stop" SWEEP_JUMP = "jump" class FileEventPattern(BasePattern): + # The path at which events will trigger this pattern triggering_path:str + # The variable name given to the triggering file within recipe code triggering_file:str + # Which types of event the pattern responds to event_mask:list[str] + # TODO move me to BasePattern defintion + # A collection of variables to be swept over for job scheduling sweep:dict[str,Any] def __init__(self, name:str, triggering_path:str, recipe:str, triggering_file:str, event_mask:list[str]=_DEFAULT_MASK, parameters:dict[str,Any]={}, outputs:dict[str,Any]={}, sweep:dict[str,Any]={}): + """FileEventPattern Constructor. This is used to match against file + system events, as caught by the python watchdog module.""" super().__init__(name, recipe, parameters, outputs) self._is_valid_triggering_path(triggering_path) self.triggering_path = triggering_path @@ -54,10 +69,9 @@ class FileEventPattern(BasePattern): self._is_valid_sweep(sweep) self.sweep = sweep - def _is_valid_recipe(self, recipe:str)->None: - valid_string(recipe, VALID_RECIPE_NAME_CHARS) - def _is_valid_triggering_path(self, triggering_path:str)->None: + """Validation check for 'triggering_path' variable from main + constructor.""" valid_path(triggering_path) if len(triggering_path) < 1: raise ValueError ( @@ -66,19 +80,31 @@ class FileEventPattern(BasePattern): ) def _is_valid_triggering_file(self, triggering_file:str)->None: + """Validation check for 'triggering_file' variable from main + constructor.""" valid_string(triggering_file, VALID_VARIABLE_NAME_CHARS) + def _is_valid_recipe(self, recipe:str)->None: + """Validation check for 'recipe' variable from main constructor. + Called within parent BasePattern constructor.""" + valid_string(recipe, VALID_RECIPE_NAME_CHARS) + def _is_valid_parameters(self, parameters:dict[str,Any])->None: + """Validation check for 'parameters' variable from main constructor. + Called within parent BasePattern constructor.""" valid_dict(parameters, str, Any, strict=False, min_length=0) for k in parameters.keys(): valid_string(k, VALID_VARIABLE_NAME_CHARS) def _is_valid_output(self, outputs:dict[str,str])->None: + """Validation check for 'output' variable from main constructor. + Called within parent BasePattern constructor.""" valid_dict(outputs, str, str, strict=False, min_length=0) for k in outputs.keys(): valid_string(k, VALID_VARIABLE_NAME_CHARS) def _is_valid_event_mask(self, event_mask)->None: + """Validation check for 'event_mask' variable from main constructor.""" valid_list(event_mask, str, min_length=1) for mask in event_mask: if mask not in FILE_EVENTS: @@ -86,6 +112,7 @@ class FileEventPattern(BasePattern): f"{FILE_EVENTS}") def _is_valid_sweep(self, sweep)->None: + """Validation check for 'sweep' variable from main constructor.""" check_type(sweep, dict) if not sweep: return @@ -109,30 +136,42 @@ class FileEventPattern(BasePattern): elif v[SWEEP_JUMP] > 0: if not v[SWEEP_STOP] > v[SWEEP_START]: raise ValueError( - "Cannot create sweep with a positive '{SWEEP_JUMP}' " + f"Cannot create sweep with a positive '{SWEEP_JUMP}' " "value where the end point is smaller than the start." ) elif v[SWEEP_JUMP] < 0: if not v[SWEEP_STOP] < v[SWEEP_START]: raise ValueError( - "Cannot create sweep with a negative '{SWEEP_JUMP}' " + f"Cannot create sweep with a negative '{SWEEP_JUMP}' " "value where the end point is smaller than the start." ) class WatchdogMonitor(BaseMonitor): + # A handler object, to catch events event_handler:PatternMatchingEventHandler + # The watchdog observer object monitor:Observer + # The base monitored directory base_dir:str + # Config option, above which debug messages are ignored debug_level:int + # Where print messages are sent _print_target:Any + #A lock to solve race conditions on '_patterns' _patterns_lock:threading.Lock + #A lock to solve race conditions on '_recipes' _recipes_lock:threading.Lock + #A lock to solve race conditions on '_rules' _rules_lock:threading.Lock def __init__(self, base_dir:str, patterns:dict[str,FileEventPattern], recipes:dict[str,BaseRecipe], autostart=False, settletime:int=1, print:Any=sys.stdout, logging:int=0)->None: + """WatchdogEventHandler Constructor. This uses the watchdog module to + monitor a directory and all its sub-directories. Watchdog will provide + the monitor with an caught events, with the monitor comparing them + against its rules, and informing the runner of match.""" super().__init__(patterns, recipes) self._is_valid_base_dir(base_dir) self.base_dir = base_dir @@ -155,22 +194,28 @@ class WatchdogMonitor(BaseMonitor): self.start() def start(self)->None: + """Function to start the monitor.""" print_debug(self._print_target, self.debug_level, "Starting WatchdogMonitor", DEBUG_INFO) self._apply_retroactive_rules() self.monitor.start() def stop(self)->None: + """Function to stop the monitor.""" print_debug(self._print_target, self.debug_level, "Stopping WatchdogMonitor", DEBUG_INFO) self.monitor.stop() def match(self, event)->None: + """Function to determine if a given event matches the current rules.""" src_path = event.src_path event_type = "dir_"+ event.event_type if event.is_directory \ else "file_" + event.event_type + # Remove the base dir from the path as trigger paths are given relative + # to that handle_path = src_path.replace(self.base_dir, '', 1) + # Also remove leading slashes, so we don't go off of the root directory while handle_path.startswith(os.path.sep): handle_path = handle_path[1:] @@ -178,15 +223,18 @@ class WatchdogMonitor(BaseMonitor): try: for rule in self._rules.values(): + # Skip events not within the event mask if event_type not in rule.pattern.event_mask: continue + # Use regex to match event paths against rule paths target_path = rule.pattern.triggering_path recursive_regexp = translate(target_path) direct_regexp = recursive_regexp.replace('.*', '[^/]*') recursive_hit = match(recursive_regexp, handle_path) direct_hit = match(direct_regexp, handle_path) + # If matched, thte create a watchdog event if direct_hit or recursive_hit: meow_event = create_event( WATCHDOG_TYPE, @@ -203,6 +251,7 @@ class WatchdogMonitor(BaseMonitor): print_debug(self._print_target, self.debug_level, f"Event at {src_path} of type {event_type} hit rule " f"{rule.name}", DEBUG_INFO) + # Send the event to the runner self.to_runner.send(meow_event) except Exception as e: @@ -212,6 +261,9 @@ class WatchdogMonitor(BaseMonitor): self._rules_lock.release() def add_pattern(self, pattern:FileEventPattern)->None: + """Function to add a pattern to the current definitions. Any rules + that can be possibly created from that pattern will be automatically + created.""" check_type(pattern, FileEventPattern) self._patterns_lock.acquire() try: @@ -227,12 +279,16 @@ class WatchdogMonitor(BaseMonitor): self._identify_new_rules(new_pattern=pattern) def update_pattern(self, pattern:FileEventPattern)->None: + """Function to update a pattern in the current definitions. Any rules + created from that pattern will be automatically updated.""" check_type(pattern, FileEventPattern) self.remove_pattern(pattern.name) print(f"adding pattern w/ recipe {pattern.recipe}") self.add_pattern(pattern) def remove_pattern(self, pattern: Union[str,FileEventPattern])->None: + """Function to remove a pattern from the current definitions. Any rules + that will be no longer valid will be automatically removed.""" check_type(pattern, str, alt_types=[FileEventPattern]) lookup_key = pattern if isinstance(lookup_key, FileEventPattern): @@ -253,7 +309,10 @@ class WatchdogMonitor(BaseMonitor): else: self._identify_lost_rules(lost_pattern=pattern) - def get_patterns(self)->None: + def get_patterns(self)->dict[str,FileEventPattern]: + """Function to get a dict of the currently defined patterns of the + monitor. Note that the result is deep-copied, and so can be manipulated + without directly manipulating the internals of the monitor.""" to_return = {} self._patterns_lock.acquire() try: @@ -265,6 +324,9 @@ class WatchdogMonitor(BaseMonitor): return to_return def add_recipe(self, recipe: BaseRecipe)->None: + """Function to add a recipe to the current definitions. Any rules + that can be possibly created from that recipe will be automatically + created.""" check_type(recipe, BaseRecipe) self._recipes_lock.acquire() try: @@ -280,17 +342,22 @@ class WatchdogMonitor(BaseMonitor): self._identify_new_rules(new_recipe=recipe) def update_recipe(self, recipe: BaseRecipe)->None: + """Function to update a recipe in the current definitions. Any rules + created from that recipe will be automatically updated.""" check_type(recipe, BaseRecipe) self.remove_recipe(recipe.name) self.add_recipe(recipe) def remove_recipe(self, recipe:Union[str,BaseRecipe])->None: + """Function to remove a recipe from the current definitions. Any rules + that will be no longer valid will be automatically removed.""" check_type(recipe, str, alt_types=[BaseRecipe]) lookup_key = recipe if isinstance(lookup_key, BaseRecipe): lookup_key = recipe.name self._recipes_lock.acquire() try: + # Check that recipe has not already been deleted if lookup_key not in self._recipes: raise KeyError(f"Cannot remote Recipe '{lookup_key}' as it " "does not already exist") @@ -305,7 +372,10 @@ class WatchdogMonitor(BaseMonitor): else: self._identify_lost_rules(lost_recipe=recipe) - def get_recipes(self)->None: + def get_recipes(self)->dict[str,BaseRecipe]: + """Function to get a dict of the currently defined recipes of the + monitor. Note that the result is deep-copied, and so can be manipulated + without directly manipulating the internals of the monitor.""" to_return = {} self._recipes_lock.acquire() try: @@ -316,7 +386,10 @@ class WatchdogMonitor(BaseMonitor): self._recipes_lock.release() return to_return - def get_rules(self)->None: + def get_rules(self)->dict[str,BaseRule]: + """Function to get a dict of the currently defined rules of the + monitor. Note that the result is deep-copied, and so can be manipulated + without directly manipulating the internals of the monitor.""" to_return = {} self._rules_lock.acquire() try: @@ -329,15 +402,20 @@ class WatchdogMonitor(BaseMonitor): def _identify_new_rules(self, new_pattern:FileEventPattern=None, new_recipe:BaseRecipe=None)->None: + """Function to determine if a new rule can be created given a new + pattern or recipe, in light of other existing patterns or recipes in + the monitor.""" if new_pattern: self._patterns_lock.acquire() self._recipes_lock.acquire() try: + # Check in case pattern has been deleted since function called if new_pattern.name not in self._patterns: self._patterns_lock.release() self._recipes_lock.release() return + # If pattern specifies recipe that already exists, make a rule if new_pattern.recipe in self._recipes: self._create_new_rule( new_pattern, @@ -354,10 +432,12 @@ class WatchdogMonitor(BaseMonitor): self._patterns_lock.acquire() self._recipes_lock.acquire() try: + # Check in case recipe has been deleted since function called if new_recipe.name not in self._recipes: self._patterns_lock.release() self._recipes_lock.release() return + # If recipe is specified by existing pattern, make a rule for pattern in self._patterns.values(): if pattern.recipe == new_recipe.name: self._create_new_rule( @@ -373,14 +453,18 @@ class WatchdogMonitor(BaseMonitor): def _identify_lost_rules(self, lost_pattern:str=None, lost_recipe:str=None)->None: + """Function to remove rules that should be deleted in response to a + pattern or recipe having been deleted.""" to_delete = [] self._rules_lock.acquire() try: + # Identify any offending rules for name, rule in self._rules.items(): if lost_pattern and rule.pattern.name == lost_pattern: - to_delete.append(name) + to_delete.append(name) if lost_recipe and rule.recipe.name == lost_recipe: - to_delete.append(name) + to_delete.append(name) + # Now delete them for delete in to_delete: if delete in self._rules.keys(): self._rules.pop(delete) @@ -389,7 +473,12 @@ class WatchdogMonitor(BaseMonitor): raise e self._rules_lock.release() - def _create_new_rule(self, pattern:FileEventPattern, recipe:BaseRecipe)->None: + def _create_new_rule(self, pattern:FileEventPattern, + recipe:BaseRecipe)->None: + """Function to create a new rule from a given pattern and recipe. This + will only be called to create rules at runtime, as rules are + automatically created at initialisation using the same 'create_rule' + function called here.""" rule = create_rule(pattern, recipe) self._rules_lock.acquire() try: @@ -405,31 +494,45 @@ class WatchdogMonitor(BaseMonitor): self._apply_retroactive_rule(rule) def _is_valid_base_dir(self, base_dir:str)->None: + """Validation check for 'base_dir' variable from main constructor. Is + automatically called during initialisation.""" valid_existing_dir_path(base_dir) def _is_valid_patterns(self, patterns:dict[str,FileEventPattern])->None: + """Validation check for 'patterns' variable from main constructor. Is + automatically called during initialisation.""" valid_dict(patterns, str, FileEventPattern, min_length=0, strict=False) def _is_valid_recipes(self, recipes:dict[str,BaseRecipe])->None: + """Validation check for 'recipes' variable from main constructor. Is + automatically called during initialisation.""" valid_dict(recipes, str, BaseRecipe, min_length=0, strict=False) def _apply_retroactive_rules(self)->None: + """Function to determine if any rules should be applied to the existing + file structure, were the file structure created/modified now.""" for rule in self._rules.values(): self._apply_retroactive_rule(rule) def _apply_retroactive_rule(self, rule:BaseRule)->None: + """Function to determine if a rule should be applied to the existing + file structure, were the file structure created/modified now.""" self._rules_lock.acquire() try: + # Check incase rule deleted since this function first called if rule.name not in self._rules: self._rules_lock.release() return + if FILE_RETROACTIVE_EVENT in rule.pattern.event_mask: - + # Determine what paths are potentially triggerable and gather + # files at those paths testing_path = os.path.join( self.base_dir, rule.pattern.triggering_path) globbed = glob.glob(testing_path) + # For each file create a fake event. for globble in globbed: meow_event = create_event( @@ -440,6 +543,7 @@ class WatchdogMonitor(BaseMonitor): print_debug(self._print_target, self.debug_level, f"Retroactive event for file at at {globble} hit rule " f"{rule.name}", DEBUG_INFO) + # Send it to the runner self.to_runner.send(meow_event) except Exception as e: @@ -449,11 +553,19 @@ class WatchdogMonitor(BaseMonitor): class WatchdogEventHandler(PatternMatchingEventHandler): + # The monitor class running this handler monitor:WatchdogMonitor + # A time to wait per event path, during which extra events are discared _settletime:int + # TODO clean this struct occasionally + # A dict of recent job timestamps _recent_jobs:dict[str, Any] + # A lock to solve race conditions on '_recent_jobs' _recent_jobs_lock:threading.Lock def __init__(self, monitor:WatchdogMonitor, settletime:int=1): + """WatchdogEventHandler Constructor. This inherits from watchdog + PatternMatchingEventHandler, and is used to catch events, then filter + out excessive events at the same location.""" super().__init__() self.monitor = monitor self._settletime = settletime @@ -461,12 +573,19 @@ class WatchdogEventHandler(PatternMatchingEventHandler): self._recent_jobs_lock = threading.Lock() def threaded_handler(self, event): + """Function to determine if the given event shall be sent on to the + monitor. After each event we wait for '_settletime', to catch + subsequent events at the same location, so as to not swamp the system + with repeated events.""" self._recent_jobs_lock.acquire() try: if event.src_path in self._recent_jobs: recent_timestamp = self._recent_jobs[event.src_path] difference = event.time_stamp - recent_timestamp + # Discard the event if we already have a recent event at this + # same path. Update the most recent time, so we can hopefully + # wait till events have stopped happening if difference <= self._settletime: self._recent_jobs[event.src_path] = \ max(recent_timestamp, event.time_stamp) @@ -481,9 +600,14 @@ class WatchdogEventHandler(PatternMatchingEventHandler): raise Exception(ex) self._recent_jobs_lock.release() + # If we did not have a recent event, then send it on to the monitor self.monitor.match(event) def handle_event(self, event): + """Handler function, called by all specific event functions. Will + attach a timestamp to the event immediately, and attempt to start a + threaded_handler so that the monitor can resume monitoring as soon as + possible.""" event.time_stamp = time() waiting_for_threaded_resources = True @@ -499,16 +623,21 @@ class WatchdogEventHandler(PatternMatchingEventHandler): sleep(1) def on_created(self, event): + """Function called when a file created event occurs.""" self.handle_event(event) def on_modified(self, event): + """Function called when a file modified event occurs.""" self.handle_event(event) def on_moved(self, event): + """Function called when a file moved event occurs.""" self.handle_event(event) def on_deleted(self, event): + """Function called when a file deleted event occurs.""" self.handle_event(event) def on_closed(self, event): + """Function called when a file closed event occurs.""" self.handle_event(event) diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index dd9090c..166f8f4 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -1,16 +1,19 @@ +""" +This file contains definitions for a MEOW recipe based off of jupyter notebooks, +along with an appropriate handler for said events. + +Author(s): David Marchant +""" import nbformat import sys -import threading -from multiprocessing import Pipe from typing import Any from core.correctness.validation import check_type, valid_string, \ - valid_dict, valid_path, valid_list, valid_existing_dir_path, \ - setup_debugging -from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, VALID_CHANNELS, \ - PYTHON_FUNC, DEBUG_INFO, WATCHDOG_TYPE, JOB_HASH, PYTHON_EXECUTION_BASE, \ + valid_dict, valid_path, valid_existing_dir_path, setup_debugging +from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, PYTHON_FUNC, \ + DEBUG_INFO, WATCHDOG_TYPE, JOB_HASH, PYTHON_EXECUTION_BASE, \ WATCHDOG_RULE, EVENT_PATH, PYTHON_TYPE, WATCHDOG_HASH, JOB_PARAMETERS, \ PYTHON_OUTPUT_DIR from core.functionality import print_debug, create_job, replace_keywords @@ -19,57 +22,75 @@ from patterns.file_event_pattern import SWEEP_START, SWEEP_STOP, SWEEP_JUMP class JupyterNotebookRecipe(BaseRecipe): + # A path to the jupyter notebook used to create this recipe source:str def __init__(self, name:str, recipe:Any, parameters:dict[str,Any]={}, requirements:dict[str,Any]={}, source:str=""): + """JupyterNotebookRecipe Constructor. This is used to execute analysis + code using the papermill module.""" super().__init__(name, recipe, parameters, requirements) self._is_valid_source(source) self.source = source def _is_valid_source(self, source:str)->None: + """Validation check for 'source' variable from main constructor.""" if source: valid_path(source, extension=".ipynb", min_length=0) def _is_valid_recipe(self, recipe:dict[str,Any])->None: + """Validation check for 'recipe' variable from main constructor. + Called within parent BaseRecipe constructor.""" check_type(recipe, dict) nbformat.validate(recipe) def _is_valid_parameters(self, parameters:dict[str,Any])->None: + """Validation check for 'parameters' variable from main constructor. + Called within parent BaseRecipe constructor.""" valid_dict(parameters, str, Any, strict=False, min_length=0) for k in parameters.keys(): valid_string(k, VALID_VARIABLE_NAME_CHARS) def _is_valid_requirements(self, requirements:dict[str,Any])->None: + """Validation check for 'requirements' variable from main constructor. + Called within parent BaseRecipe constructor.""" valid_dict(requirements, str, Any, strict=False, min_length=0) for k in requirements.keys(): valid_string(k, VALID_VARIABLE_NAME_CHARS) class PapermillHandler(BaseHandler): + # TODO move me to conductor + # Execution directory handler_base:str + # TODO possibly move me also to conductor? + # Final location for job output to be placed output_dir:str + # Config option, above which debug messages are ignored debug_level:int - _worker:threading.Thread - _stop_pipe:Pipe + # Where print messages are sent _print_target:Any def __init__(self, handler_base:str, output_dir:str, print:Any=sys.stdout, logging:int=0)->None: + """PapermillHandler Constructor. This creats jobs to be executed using + the papermill module. This does not run as a continuous thread to + handle execution, but is invoked according to a factory pattern using + the handle function.""" super().__init__() self._is_valid_handler_base(handler_base) self.handler_base = handler_base self._is_valid_output_dir(output_dir) self.output_dir = output_dir self._print_target, self.debug_level = setup_debugging(print, logging) - self._worker = None - self._stop_pipe = Pipe() print_debug(self._print_target, self.debug_level, "Created new PapermillHandler instance", DEBUG_INFO) def handle(self, event:dict[str,Any])->None: + """Function called to handle a given event.""" print_debug(self._print_target, self.debug_level, f"Handling event {event[EVENT_PATH]}", DEBUG_INFO) rule = event[WATCHDOG_RULE] + # Assemble job parameters dict from pattern variables yaml_dict = {} for var, val in rule.pattern.parameters.items(): yaml_dict[var] = val @@ -77,9 +98,11 @@ class PapermillHandler(BaseHandler): yaml_dict[var] = val yaml_dict[rule.pattern.triggering_file] = event[EVENT_PATH] + # If no parameter sweeps, then one job will suffice if not rule.pattern.sweep: self.setup_job(event, yaml_dict) else: + # If parameter sweeps, then many jobs created for var, val in rule.pattern.sweep.items(): values = [] par_val = rule.pattern.sweep[SWEEP_START] @@ -92,18 +115,25 @@ class PapermillHandler(BaseHandler): self.setup_job(event, yaml_dict) def valid_event_types(self)->list[str]: + """Function to provide a list of the types of events this handler can + process.""" return [WATCHDOG_TYPE] - def _is_valid_inputs(self, inputs:list[VALID_CHANNELS])->None: - valid_list(inputs, VALID_CHANNELS) - def _is_valid_handler_base(self, handler_base)->None: + """Validation check for 'handler_base' variable from main + constructor.""" valid_existing_dir_path(handler_base) def _is_valid_output_dir(self, output_dir)->None: + """Validation check for 'output_dir' variable from main + constructor.""" valid_existing_dir_path(output_dir, allow_base=True) def setup_job(self, event:dict[str,Any], yaml_dict:dict[str,Any])->None: + """Function to set up new job dict and send it to the runner to be + executed.""" + # TODO edit me to write job files to a local store in handler, then + # read those files within conductor meow_job = create_job(PYTHON_TYPE, event, { JOB_PARAMETERS:yaml_dict, JOB_HASH: event[WATCHDOG_HASH], @@ -115,7 +145,9 @@ class PapermillHandler(BaseHandler): f"{PYTHON_TYPE}.", DEBUG_INFO) self.to_runner.send(meow_job) +# Papermill job execution code, to be run within the conductor def job_func(job): + # Requires own imports as will be run in its own execution environment import os import shutil import papermill @@ -131,6 +163,7 @@ def job_func(job): event = job[JOB_EVENT] + # replace MEOW keyworks within variables dict yaml_dict = replace_keywords( job[JOB_PARAMETERS], job[JOB_ID], @@ -138,15 +171,19 @@ def job_func(job): event[WATCHDOG_BASE] ) + # Create a base job directory job_dir = os.path.join(job[PYTHON_EXECUTION_BASE], job[JOB_ID]) make_dir(job_dir) + # write a status file to the job directory meta_file = os.path.join(job_dir, META_FILE) write_yaml(job, meta_file) + # write an executable notebook to the job directory base_file = os.path.join(job_dir, BASE_FILE) write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file) + # write a parameter file to the job directory param_file = os.path.join(job_dir, PARAMS_FILE) write_yaml(yaml_dict, param_file) @@ -156,10 +193,17 @@ def job_func(job): job[JOB_STATUS] = STATUS_RUNNING job[JOB_START_TIME] = datetime.now() + # update the status file with running status write_yaml(job, meta_file) + # Check the hash of the triggering file, if present. This addresses + # potential race condition as file could have been modified since + # triggering event if JOB_HASH in job: + # get current hash triggerfile_hash = get_file_hash(job[JOB_EVENT][EVENT_PATH], SHA256) + # If hash doesn't match, then abort the job. If its been modified, then + # another job will have been scheduled anyway. if not triggerfile_hash \ or triggerfile_hash != job[JOB_HASH]: job[JOB_STATUS] = STATUS_SKIPPED @@ -172,6 +216,7 @@ def job_func(job): write_yaml(job, meta_file) return + # Create a parameterised version of the executable notebook try: job_notebook = parameterize_jupyter_notebook( event[WATCHDOG_RULE].recipe.recipe, yaml_dict @@ -185,6 +230,7 @@ def job_func(job): write_yaml(job, meta_file) return + # Execute the parameterised notebook try: papermill.execute_notebook(job_file, result_file, {}) except Exception as e: @@ -195,10 +241,12 @@ def job_func(job): write_yaml(job, meta_file) return + # Update the status file with the finalised status job[JOB_STATUS] = STATUS_DONE job[JOB_END_TIME] = datetime.now() write_yaml(job, meta_file) + # Move the contents of the execution directory to the final output + # directory. job_output_dir = os.path.join(job[PYTHON_OUTPUT_DIR], job[JOB_ID]) - shutil.move(job_dir, job_output_dir) diff --git a/rules/file_event_jupyter_notebook_rule.py b/rules/file_event_jupyter_notebook_rule.py index 4a9b729..c69b0b6 100644 --- a/rules/file_event_jupyter_notebook_rule.py +++ b/rules/file_event_jupyter_notebook_rule.py @@ -1,9 +1,17 @@ +""" +This file contains definitions for a MEOW rule connecting the FileEventPattern +and JupyterNotebookRecipe. + +Author(s): David Marchant +""" from core.correctness.validation import check_type from core.meow import BaseRule from patterns.file_event_pattern import FileEventPattern from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe +# TODO potentailly remove this and just invoke BaseRule directly, as does not +# add any functionality other than some validation. class FileEventJupyterNotebookRule(BaseRule): pattern_type = "FileEventPattern" recipe_type = "JupyterNotebookRecipe" @@ -16,7 +24,11 @@ class FileEventJupyterNotebookRule(BaseRule): f"uses {pattern.recipe}") def _is_valid_pattern(self, pattern:FileEventPattern)->None: + """Validation check for 'pattern' variable from main constructor. Is + automatically called during initialisation.""" check_type(pattern, FileEventPattern) def _is_valid_recipe(self, recipe:JupyterNotebookRecipe)->None: + """Validation check for 'recipe' variable from main constructor. Is + automatically called during initialisation.""" check_type(recipe, JupyterNotebookRecipe) diff --git a/tests/test_all.sh b/tests/test_all.sh index ef0d403..ce7803c 100755 --- a/tests/test_all.sh +++ b/tests/test_all.sh @@ -1,4 +1,6 @@ #! /bin/bash +# A script to run all tests. This will automatically move to the tests +# directory and call all other files as pytest scripts # Need to more to local dir to run tests starting_working_dir=$(pwd) diff --git a/tests/test_meow.py b/tests/test_meow.py index 1b323ab..03dd6b5 100644 --- a/tests/test_meow.py +++ b/tests/test_meow.py @@ -200,7 +200,7 @@ class MeowTests(unittest.TestCase): FullTestHandler() def testBaseConductor(self)->None: - with self.assertRaises(NotImplementedError): + with self.assertRaises(TypeError): BaseConductor() class TestConductor(BaseConductor):