diff --git a/conductors/local_bash_conductor.py b/conductors/local_bash_conductor.py index 02f24c5..ae0e1b5 100644 --- a/conductors/local_bash_conductor.py +++ b/conductors/local_bash_conductor.py @@ -27,12 +27,13 @@ from meow_base.functionality.file_io import make_dir, write_file, \ class LocalBashConductor(BaseConductor): def __init__(self, job_queue_dir:str=DEFAULT_JOB_QUEUE_DIR, - job_output_dir:str=DEFAULT_JOB_OUTPUT_DIR, name:str="")->None: + job_output_dir:str=DEFAULT_JOB_OUTPUT_DIR, name:str="", + pause_time:int=5)->None: """LocalBashConductor Constructor. This should be used to execute Bash jobs, and will then pass any internal job runner files to the output directory. Note that if this handler is given to a MeowRunner object, the job_queue_dir and job_output_dir will be overwridden.""" - super().__init__(name=name) + super().__init__(name=name, pause_time=pause_time) self._is_valid_job_queue_dir(job_queue_dir) self.job_queue_dir = job_queue_dir self._is_valid_job_output_dir(job_output_dir) diff --git a/conductors/local_python_conductor.py b/conductors/local_python_conductor.py index 5fed3a2..13064ac 100644 --- a/conductors/local_python_conductor.py +++ b/conductors/local_python_conductor.py @@ -24,12 +24,13 @@ from meow_base.functionality.file_io import make_dir, write_file, \ class LocalPythonConductor(BaseConductor): def __init__(self, job_queue_dir:str=DEFAULT_JOB_QUEUE_DIR, - job_output_dir:str=DEFAULT_JOB_OUTPUT_DIR, name:str="")->None: + job_output_dir:str=DEFAULT_JOB_OUTPUT_DIR, name:str="", + pause_time:int=5)->None: """LocalPythonConductor Constructor. This should be used to execute Python jobs, and will then pass any internal job runner files to the output directory. Note that if this handler is given to a MeowRunner object, the job_queue_dir and job_output_dir will be overwridden.""" - super().__init__(name=name) + super().__init__(name=name, pause_time=pause_time) self._is_valid_job_queue_dir(job_queue_dir) self.job_queue_dir = job_queue_dir self._is_valid_job_output_dir(job_output_dir) diff --git a/core/base_conductor.py b/core/base_conductor.py index 5df2e42..f975387 100644 --- a/core/base_conductor.py +++ b/core/base_conductor.py @@ -6,12 +6,14 @@ from for all conductor instances. Author(s): David Marchant """ -from typing import Any, Tuple, Dict +from threading import Event, Thread +from time import sleep +from typing import Any, Tuple, Dict, Union from meow_base.core.vars import VALID_CONDUCTOR_NAME_CHARS, VALID_CHANNELS, \ get_drt_imp_msg from meow_base.functionality.validation import check_implementation, \ - valid_string + valid_string, valid_existing_dir_path, valid_natural from meow_base.functionality.naming import generate_conductor_id @@ -19,10 +21,11 @@ class BaseConductor: # An identifier for a conductor within the runner. Can be manually set in # the constructor, or autogenerated if no name provided. name:str - # A channel for sending messages to the runner. Note that this will be - # overridden by a MeowRunner, if a conductor instance is passed to it, and - # so does not need to be initialised within the conductor itself. - to_runner: VALID_CHANNELS + # A channel for sending messages to the runner job queue. Note that this + # will be overridden by a MeowRunner, if a conductor instance is passed to + # it, and so does not need to be initialised within the conductor itself, + # unless the conductor is running independently of a runner. + to_runner_job: VALID_CHANNELS # Directory where queued jobs are initially written to. Note that this # will be overridden by a MeowRunner, if a handler instance is passed to # it, and so does not need to be initialised within the handler itself. @@ -31,16 +34,20 @@ class BaseConductor: # will be overridden by a MeowRunner, if a handler instance is passed to # it, and so does not need to be initialised within the handler itself. job_output_dir:str - def __init__(self, name:str="")->None: + # A count, for how long a conductor will wait if told that there are no + # jobs in the runner, before polling again. Default is 5 seconds. + pause_time: int + def __init__(self, name:str="", pause_time:int=5)->None: """BaseConductor Constructor. This will check that any class inheriting from it implements its validation functions.""" check_implementation(type(self).execute, BaseConductor) check_implementation(type(self).valid_execute_criteria, BaseConductor) - check_implementation(type(self).prompt_runner_for_job, BaseConductor) if not name: name = generate_conductor_id() self._is_valid_name(name) self.name = name + self._is_valid_pause_time(pause_time) + self.pause_time = pause_time def __new__(cls, *args, **kwargs): """A check that this base class is not instantiated itself, only @@ -56,21 +63,67 @@ class BaseConductor: overridden by child classes.""" valid_string(name, VALID_CONDUCTOR_NAME_CHARS) - def prompt_runner_for_job(self): - pass + def _is_valid_pause_time(self, pause_time:int)->None: + """Validation check for 'pause_time' variable from main constructor. Is + automatically called during initialisation. This does not need to be + overridden by child classes.""" + valid_natural(pause_time, hint="BaseHandler.pause_time") + + def prompt_runner_for_job(self)->Union[Dict[str,Any],Any]: + self.to_runner_job.send(1) + + if self.to_runner_job.poll(self.pause_time): + return self.to_runner_job.recv() + return None def start(self)->None: - """Function to start the conductor as an ongoing process/thread. May be - overidden by any child process. Note that by default this will raise an - execption that is automatically handled within a runner instance.""" - raise NotImplementedError + """Function to start the conductor as an ongoing thread, as defined by + the main_loop function. Together, these will execute any code in a + implemented conductors execute function sequentially, but concurrently + to any other conductors running or other runner operations. This is + intended as a naive mmultiprocessing implementation, and any more in + depth parallelisation of execution must be implemented by a user by + overriding this function, and the stop function.""" + self._stop_event = Event() + self._handle_thread = Thread( + target=self.main_loop, + args=(self._stop_event,), + daemon=True, + name="conductor_thread" + ) + self._handle_thread.start() def stop(self)->None: - """Function to stop the conductor as an ongoing process/thread. May be - overidden by any child process. Note that by default this will raise an - execption that is automatically handled within a runner instance.""" - raise NotImplementedError - + """Function to stop the conductor as an ongoing thread. May be + overidden by any child class. This function should also be overriden if + the start function has been.""" + self._stop_event.set() + self._handle_thread.join() + + def main_loop(self, stop_event)->None: + """Function defining an ongoing thread, as started by the start + function and stoped by the stop function. """ + + while not stop_event.is_set(): + reply = self.prompt_runner_for_job() + + # If we have recieved 'None' then we have already timed out so skip + # this loop and start again + if reply is None: + continue + + try: + valid_existing_dir_path(reply) + except: + # Were not given a job dir, so sleep before trying again + sleep(self.pause_time) + + try: + self.execute(reply) + except: + # TODO some error reporting here + pass + def valid_execute_criteria(self, job:Dict[str,Any])->Tuple[bool,str]: """Function to determine given an job defintion, if this conductor can process it or not. Must be implemented by any child process.""" diff --git a/core/base_handler.py b/core/base_handler.py index 3b0873a..0c54d9b 100644 --- a/core/base_handler.py +++ b/core/base_handler.py @@ -6,37 +6,50 @@ from for all handler instances. Author(s): David Marchant """ -from typing import Any, Tuple, Dict + +from threading import Event, Thread +from typing import Any, Tuple, Dict, Union +from time import sleep from meow_base.core.vars import VALID_CHANNELS, \ VALID_HANDLER_NAME_CHARS, get_drt_imp_msg +from meow_base.core.meow import valid_event from meow_base.functionality.validation import check_implementation, \ - valid_string + valid_string, valid_natural from meow_base.functionality.naming import generate_handler_id class BaseHandler: # An identifier for a handler within the runner. Can be manually set in # the constructor, or autogenerated if no name provided. name:str - # A channel for sending messages to the runner. Note that this will be - # overridden by a MeowRunner, if a handler instance is passed to it, and so - # does not need to be initialised within the handler itself. - to_runner: VALID_CHANNELS + # A channel for sending messages to the runner event queue. Note that this + # will be overridden by a MeowRunner, if a handler instance is passed to + # it, and so does not need to be initialised within the handler itself, + # unless the handler is running independently of a runner. + to_runner_event: VALID_CHANNELS + # A channel for sending messages to the runner job queue. Note that this + # will be overridden by a MeowRunner, if a handler instance is passed to + # it, and so does not need to be initialised within the handler itself, + # unless the handler is running independently of a runner. + to_runner_job: VALID_CHANNELS # Directory where queued jobs are initially written to. Note that this # will be overridden by a MeowRunner, if a handler instance is passed to # it, and so does not need to be initialised within the handler itself. job_queue_dir:str - def __init__(self, name:str='')->None: + # A count, for how long a handler will wait if told that there are no + # events in the runner, before polling again. Default is 5 seconds. + pause_time: int + def __init__(self, name:str='', pause_time:int=5)->None: """BaseHandler Constructor. This will check that any class inheriting from it implements its validation functions.""" check_implementation(type(self).handle, BaseHandler) check_implementation(type(self).valid_handle_criteria, BaseHandler) - check_implementation(type(self).prompt_runner_for_event, BaseHandler) - check_implementation(type(self).send_job_to_runner, BaseHandler) if not name: name = generate_handler_id() self._is_valid_name(name) - self.name = name + self.name = name + self._is_valid_pause_time(pause_time) + self.pause_time = pause_time def __new__(cls, *args, **kwargs): """A check that this base class is not instantiated itself, only @@ -52,24 +65,71 @@ class BaseHandler: overridden by child classes.""" valid_string(name, VALID_HANDLER_NAME_CHARS) - def prompt_runner_for_event(self): - pass + def _is_valid_pause_time(self, pause_time:int)->None: + """Validation check for 'pause_time' variable from main constructor. Is + automatically called during initialisation. This does not need to be + overridden by child classes.""" + valid_natural(pause_time, hint="BaseHandler.pause_time") - def send_job_to_runner(self, msg): - #self.to_runner.send(msg) - pass + def prompt_runner_for_event(self)->Union[Dict[str,Any],Any]: + self.to_runner_event.send(1) + + if self.to_runner_event.poll(self.pause_time): + return self.to_runner_event.recv() + return None + + def send_job_to_runner(self, job_id:str)->None: + self.to_runner_job.send(job_id) def start(self)->None: - """Function to start the handler as an ongoing process/thread. May be - overidden by any child process. Note that by default this will raise an - execption that is automatically handled within a runner instance.""" - raise NotImplementedError + """Function to start the handler as an ongoing thread, as defined by + the main_loop function. Together, these will execute any code in a + implemented handlers handle function sequentially, but concurrently to + any other handlers running or other runner operations. This is intended + as a naive mmultiprocessing implementation, and any more in depth + parallelisation of execution must be implemented by a user by + overriding this function, and the stop function.""" + self._stop_event = Event() + self._handle_thread = Thread( + target=self.main_loop, + args=(self._stop_event,), + daemon=True, + name="handler_thread" + ) + self._handle_thread.start() def stop(self)->None: - """Function to stop the handler as an ongoing process/thread. May be - overidden by any child process. Note that by default this will raise an - execption that is automatically handled within a runner instance.""" - raise NotImplementedError + """Function to stop the handler as an ongoing thread. May be overidden + by any child class. This function should also be overriden if the start + function has been.""" + + self._stop_event.set() + self._handle_thread.join() + + def main_loop(self, stop_event)->None: + """Function defining an ongoing thread, as started by the start + function and stoped by the stop function. """ + + while not stop_event.is_set(): + reply = self.prompt_runner_for_event() + + # If we have recieved 'None' then we have already timed out so skip + # this loop and start again + if reply is None: + continue + + try: + valid_event(reply) + except Exception as e: + # Were not given an event, so sleep before trying again + sleep(self.pause_time) + + + try: + self.handle(reply) + except Exception as e: + # TODO some error reporting here + pass def valid_handle_criteria(self, event:Dict[str,Any])->Tuple[bool,str]: """Function to determine given an event defintion, if this handler can @@ -78,5 +138,7 @@ class BaseHandler: def handle(self, event:Dict[str,Any])->None: """Function to handle a given event. Must be implemented by any child - process.""" + process. Note that once any handling has occured, the + send_job_to_runner function should be called to inform the runner of + any resultant jobs.""" pass diff --git a/core/base_monitor.py b/core/base_monitor.py index 1d70ecb..8d35ae4 100644 --- a/core/base_monitor.py +++ b/core/base_monitor.py @@ -7,7 +7,8 @@ Author(s): David Marchant """ from copy import deepcopy -from typing import Union, Dict +from threading import Lock +from typing import Union, Dict, List from meow_base.core.base_pattern import BasePattern from meow_base.core.base_recipe import BaseRecipe @@ -15,8 +16,8 @@ from meow_base.core.rule import Rule from meow_base.core.vars import VALID_CHANNELS, \ VALID_MONITOR_NAME_CHARS, get_drt_imp_msg from meow_base.functionality.validation import check_implementation, \ - valid_string -from meow_base.functionality.meow import create_rules + valid_string, check_type, check_types, valid_dict_multiple_types +from meow_base.functionality.meow import create_rules, create_rule from meow_base.functionality.naming import generate_monitor_id @@ -30,10 +31,17 @@ class BaseMonitor: _recipes: Dict[str, BaseRecipe] # A collection of rules derived from _patterns and _recipes _rules: Dict[str, Rule] - # A channel for sending messages to the runner. Note that this is not - # initialised within the constructor, but within the runner when passed the - # monitor is passed to it. - to_runner: VALID_CHANNELS + # A channel for sending messages to the runner event queue. Note that this + # is not initialised within the constructor, but within the runner when the + # monitor is passed to it unless the monitor is running independently of a + # runner. + to_runner_event: VALID_CHANNELS + #A lock to solve race conditions on '_patterns' + _patterns_lock:Lock + #A lock to solve race conditions on '_recipes' + _recipes_lock:Lock + #A lock to solve race conditions on '_rules' + _rules_lock:Lock def __init__(self, patterns:Dict[str,BasePattern], recipes:Dict[str,BaseRecipe], name:str="")->None: """BaseMonitor Constructor. This will check that any class inheriting @@ -41,20 +49,10 @@ class BaseMonitor: the input parameters.""" check_implementation(type(self).start, BaseMonitor) check_implementation(type(self).stop, BaseMonitor) - check_implementation(type(self)._is_valid_patterns, BaseMonitor) + check_implementation(type(self)._get_valid_pattern_types, BaseMonitor) self._is_valid_patterns(patterns) - check_implementation(type(self)._is_valid_recipes, BaseMonitor) + check_implementation(type(self)._get_valid_recipe_types, BaseMonitor) self._is_valid_recipes(recipes) - check_implementation(type(self).add_pattern, BaseMonitor) - check_implementation(type(self).update_pattern, BaseMonitor) - check_implementation(type(self).remove_pattern, BaseMonitor) - check_implementation(type(self).get_patterns, BaseMonitor) - check_implementation(type(self).add_recipe, BaseMonitor) - check_implementation(type(self).update_recipe, BaseMonitor) - check_implementation(type(self).remove_recipe, BaseMonitor) - check_implementation(type(self).get_recipes, BaseMonitor) - check_implementation(type(self).get_rules, BaseMonitor) - check_implementation(type(self).send_event_to_runner, BaseMonitor) # Ensure that patterns and recipes cannot be trivially modified from # outside the monitor, as this will cause internal consistency issues self._patterns = deepcopy(patterns) @@ -63,7 +61,10 @@ class BaseMonitor: if not name: name = generate_monitor_id() self._is_valid_name(name) - self.name = name + self.name = name + self._patterns_lock = Lock() + self._recipes_lock = Lock() + self._rules_lock = Lock() def __new__(cls, *args, **kwargs): """A check that this base class is not instantiated itself, only @@ -80,21 +81,145 @@ class BaseMonitor: valid_string(name, VALID_MONITOR_NAME_CHARS) def _is_valid_patterns(self, patterns:Dict[str,BasePattern])->None: - """Validation check for 'patterns' variable from main constructor. Must - be implemented by any child class.""" - pass + """Validation check for 'patterns' variable from main constructor.""" + valid_dict_multiple_types( + patterns, + str, + self._get_valid_pattern_types(), + min_length=0, + strict=False + ) + + def _get_valid_pattern_types(self)->List[type]: + """Validation check used throughout monitor to check that only + compatible patterns are used. Must be implmented by any child class.""" + raise NotImplementedError def _is_valid_recipes(self, recipes:Dict[str,BaseRecipe])->None: - """Validation check for 'recipes' variable from main constructor. Must - be implemented by any child class.""" + """Validation check for 'recipes' variable from main constructor.""" + valid_dict_multiple_types( + recipes, + str, + self._get_valid_recipe_types(), + min_length=0, + strict=False + ) + + def _get_valid_recipe_types(self)->List[type]: + """Validation check used throughout monitor to check that only + compatible recipes are used. Must be implmented by any child class.""" + raise NotImplementedError + + def _identify_new_rules(self, new_pattern:BasePattern=None, + new_recipe:BaseRecipe=None)->None: + """Function to determine if a new rule can be created given a new + pattern or recipe, in light of other existing patterns or recipes in + the monitor.""" + + if new_pattern: + self._patterns_lock.acquire() + self._recipes_lock.acquire() + try: + # Check in case pattern has been deleted since function called + if new_pattern.name not in self._patterns: + self._patterns_lock.release() + self._recipes_lock.release() + return + # If pattern specifies recipe that already exists, make a rule + if new_pattern.recipe in self._recipes: + self._create_new_rule( + new_pattern, + self._recipes[new_pattern.recipe], + ) + except Exception as e: + self._patterns_lock.release() + self._recipes_lock.release() + raise e + self._patterns_lock.release() + self._recipes_lock.release() + + if new_recipe: + self._patterns_lock.acquire() + self._recipes_lock.acquire() + try: + # Check in case recipe has been deleted since function called + if new_recipe.name not in self._recipes: + self._patterns_lock.release() + self._recipes_lock.release() + return + # If recipe is specified by existing pattern, make a rule + for pattern in self._patterns.values(): + if pattern.recipe == new_recipe.name: + self._create_new_rule( + pattern, + new_recipe, + ) + except Exception as e: + self._patterns_lock.release() + self._recipes_lock.release() + raise e + self._patterns_lock.release() + self._recipes_lock.release() + + def _identify_lost_rules(self, lost_pattern:str=None, + lost_recipe:str=None)->None: + """Function to remove rules that should be deleted in response to a + pattern or recipe having been deleted.""" + to_delete = [] + self._rules_lock.acquire() + try: + # Identify any offending rules + for name, rule in self._rules.items(): + if lost_pattern and rule.pattern.name == lost_pattern: + to_delete.append(name) + if lost_recipe and rule.recipe.name == lost_recipe: + to_delete.append(name) + # Now delete them + for delete in to_delete: + if delete in self._rules.keys(): + self._rules.pop(delete) + except Exception as e: + self._rules_lock.release() + raise e + self._rules_lock.release() + + def _create_new_rule(self, pattern:BasePattern, recipe:BaseRecipe)->None: + """Function to create a new rule from a given pattern and recipe. This + will only be called to create rules at runtime, as rules are + automatically created at initialisation using the same 'create_rule' + function called here.""" + rule = create_rule(pattern, recipe) + self._rules_lock.acquire() + try: + if rule.name in self._rules: + raise KeyError("Cannot create Rule with name of " + f"'{rule.name}' as already in use") + self._rules[rule.name] = rule + except Exception as e: + self._rules_lock.release() + raise e + self._rules_lock.release() + + self._apply_retroactive_rule(rule) + + def _apply_retroactive_rule(self, rule:Rule)->None: + """Function to determine if a rule should be applied to any existing + defintions, if possible. May be implemented by inherited classes.""" + pass + + def _apply_retroactive_rules(self)->None: + """Function to determine if any rules should be applied to any existing + defintions, if possible. May be implemented by inherited classes.""" pass def send_event_to_runner(self, msg): - self.to_runner.send(msg) + self.to_runner_event.send(msg) def start(self)->None: """Function to start the monitor as an ongoing process/thread. Must be - implemented by any child process""" + implemented by any child process. Depending on the nature of the + monitor, this may wish to directly call apply_retroactive_rules before + starting.""" pass def stop(self)->None: @@ -103,46 +228,162 @@ class BaseMonitor: pass def add_pattern(self, pattern:BasePattern)->None: - """Function to add a pattern to the current definitions. Must be - implemented by any child process.""" - pass + """Function to add a pattern to the current definitions. Any rules + that can be possibly created from that pattern will be automatically + created.""" + check_types( + pattern, + self._get_valid_pattern_types(), + hint="add_pattern.pattern" + ) + + self._patterns_lock.acquire() + try: + if pattern.name in self._patterns: + raise KeyError(f"An entry for Pattern '{pattern.name}' " + "already exists. Do you intend to update instead?") + self._patterns[pattern.name] = pattern + except Exception as e: + self._patterns_lock.release() + raise e + self._patterns_lock.release() + + self._identify_new_rules(new_pattern=pattern) def update_pattern(self, pattern:BasePattern)->None: - """Function to update a pattern in the current definitions. Must be - implemented by any child process.""" - pass + """Function to update a pattern in the current definitions. Any rules + created from that pattern will be automatically updated.""" + check_types( + pattern, + self._get_valid_pattern_types(), + hint="update_pattern.pattern" + ) - def remove_pattern(self, pattern:Union[str,BasePattern])->None: - """Function to remove a pattern from the current definitions. Must be - implemented by any child process.""" - pass + self.remove_pattern(pattern.name) + self.add_pattern(pattern) + def remove_pattern(self, pattern: Union[str,BasePattern])->None: + """Function to remove a pattern from the current definitions. Any rules + that will be no longer valid will be automatically removed.""" + check_type( + pattern, + str, + alt_types=[BasePattern], + hint="remove_pattern.pattern" + ) + lookup_key = pattern + if isinstance(lookup_key, BasePattern): + lookup_key = pattern.name + self._patterns_lock.acquire() + try: + if lookup_key not in self._patterns: + raise KeyError(f"Cannot remote Pattern '{lookup_key}' as it " + "does not already exist") + self._patterns.pop(lookup_key) + except Exception as e: + self._patterns_lock.release() + raise e + self._patterns_lock.release() + + if isinstance(pattern, BasePattern): + self._identify_lost_rules(lost_pattern=pattern.name) + else: + self._identify_lost_rules(lost_pattern=pattern) + def get_patterns(self)->Dict[str,BasePattern]: - """Function to get a dictionary of all current pattern definitions. - Must be implemented by any child process.""" - pass + """Function to get a dict of the currently defined patterns of the + monitor. Note that the result is deep-copied, and so can be manipulated + without directly manipulating the internals of the monitor.""" + to_return = {} + self._patterns_lock.acquire() + try: + to_return = deepcopy(self._patterns) + except Exception as e: + self._patterns_lock.release() + raise e + self._patterns_lock.release() + return to_return - def add_recipe(self, recipe:BaseRecipe)->None: - """Function to add a recipe to the current definitions. Must be - implemented by any child process.""" - pass + def add_recipe(self, recipe: BaseRecipe)->None: + """Function to add a recipe to the current definitions. Any rules + that can be possibly created from that recipe will be automatically + created.""" + check_type(recipe, BaseRecipe, hint="add_recipe.recipe") + self._recipes_lock.acquire() + try: + if recipe.name in self._recipes: + raise KeyError(f"An entry for Recipe '{recipe.name}' already " + "exists. Do you intend to update instead?") + self._recipes[recipe.name] = recipe + except Exception as e: + self._recipes_lock.release() + raise e + self._recipes_lock.release() - def update_recipe(self, recipe:BaseRecipe)->None: - """Function to update a recipe in the current definitions. Must be - implemented by any child process.""" - pass + self._identify_new_rules(new_recipe=recipe) + def update_recipe(self, recipe: BaseRecipe)->None: + """Function to update a recipe in the current definitions. Any rules + created from that recipe will be automatically updated.""" + check_type(recipe, BaseRecipe, hint="update_recipe.recipe") + self.remove_recipe(recipe.name) + self.add_recipe(recipe) + def remove_recipe(self, recipe:Union[str,BaseRecipe])->None: - """Function to remove a recipe from the current definitions. Must be - implemented by any child process.""" - pass + """Function to remove a recipe from the current definitions. Any rules + that will be no longer valid will be automatically removed.""" + check_type( + recipe, + str, + alt_types=[BaseRecipe], + hint="remove_recipe.recipe" + ) + lookup_key = recipe + if isinstance(lookup_key, BaseRecipe): + lookup_key = recipe.name + self._recipes_lock.acquire() + try: + # Check that recipe has not already been deleted + if lookup_key not in self._recipes: + raise KeyError(f"Cannot remote Recipe '{lookup_key}' as it " + "does not already exist") + self._recipes.pop(lookup_key) + except Exception as e: + self._recipes_lock.release() + raise e + self._recipes_lock.release() + + if isinstance(recipe, BaseRecipe): + self._identify_lost_rules(lost_recipe=recipe.name) + else: + self._identify_lost_rules(lost_recipe=recipe) def get_recipes(self)->Dict[str,BaseRecipe]: - """Function to get a dictionary of all current recipe definitions. - Must be implemented by any child process.""" - pass - + """Function to get a dict of the currently defined recipes of the + monitor. Note that the result is deep-copied, and so can be manipulated + without directly manipulating the internals of the monitor.""" + to_return = {} + self._recipes_lock.acquire() + try: + to_return = deepcopy(self._recipes) + except Exception as e: + self._recipes_lock.release() + raise e + self._recipes_lock.release() + return to_return + def get_rules(self)->Dict[str,Rule]: - """Function to get a dictionary of all current rule definitions. - Must be implemented by any child process.""" - pass + """Function to get a dict of the currently defined rules of the + monitor. Note that the result is deep-copied, and so can be manipulated + without directly manipulating the internals of the monitor.""" + to_return = {} + self._rules_lock.acquire() + try: + to_return = deepcopy(self._rules) + except Exception as e: + self._rules_lock.release() + raise e + self._rules_lock.release() + return to_return + + diff --git a/core/runner.py b/core/runner.py index 04b120e..3f301bb 100644 --- a/core/runner.py +++ b/core/runner.py @@ -11,15 +11,13 @@ import sys import threading from multiprocessing import Pipe -from random import randrange -from typing import Any, Union, Dict, List, Type +from typing import Any, Union, Dict, List, Type, Tuple from meow_base.core.base_conductor import BaseConductor from meow_base.core.base_handler import BaseHandler from meow_base.core.base_monitor import BaseMonitor from meow_base.core.vars import DEBUG_WARNING, DEBUG_INFO, \ - EVENT_TYPE, VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, \ - DEFAULT_JOB_QUEUE_DIR, EVENT_PATH + VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, DEFAULT_JOB_QUEUE_DIR from meow_base.functionality.validation import check_type, valid_list, \ valid_dir_path, check_implementation from meow_base.functionality.debug import setup_debugging, print_debug @@ -34,14 +32,18 @@ class MeowRunner: handlers:List[BaseHandler] # A collection of all conductors in the runner conductors:List[BaseConductor] - # A collection of all channels from each monitor - from_monitors: List[VALID_CHANNELS] - # A collection of all channels from each handler - from_handlers: List[VALID_CHANNELS] + # A collection of all inputs for the event queue + event_connections: List[Tuple[VALID_CHANNELS,Union[BaseMonitor,BaseHandler]]] + # A collection of all inputs for the job queue + job_connections: List[Tuple[VALID_CHANNELS,Union[BaseHandler,BaseConductor]]] # Directory where queued jobs are initially written to job_queue_dir:str # Directory where completed jobs are finally written to job_output_dir:str + # A queue of all events found by monitors, awaiting handling by handlers + event_queue:List[Dict[str,Any]] + # A queue of all jobs setup by handlers, awaiting execution by conductors + job_queue:List[str] def __init__(self, monitors:Union[BaseMonitor,List[BaseMonitor]], handlers:Union[BaseHandler,List[BaseHandler]], conductors:Union[BaseConductor,List[BaseConductor]], @@ -55,6 +57,37 @@ class MeowRunner: self._is_valid_job_queue_dir(job_queue_dir) self._is_valid_job_output_dir(job_output_dir) + self.job_connections = [] + self.event_connections = [] + + self._is_valid_monitors(monitors) + # If monitors isn't a list, make it one + if not type(monitors) == list: + monitors = [monitors] + self.monitors = monitors + for monitor in self.monitors: + # Create a channel from the monitor back to this runner + monitor_to_runner_reader, monitor_to_runner_writer = Pipe() + monitor.to_runner_event = monitor_to_runner_writer + self.event_connections.append((monitor_to_runner_reader, monitor)) + + self._is_valid_handlers(handlers) + # If handlers isn't a list, make it one + if not type(handlers) == list: + handlers = [handlers] + for handler in handlers: + handler.job_queue_dir = job_queue_dir + + # Create channels from the handler back to this runner + h_to_r_event_runner, h_to_r_event_handler = Pipe(duplex=True) + h_to_r_job_reader, h_to_r_job_writer = Pipe() + + handler.to_runner_event = h_to_r_event_handler + handler.to_runner_job = h_to_r_job_writer + self.event_connections.append((h_to_r_event_runner, handler)) + self.job_connections.append((h_to_r_job_reader, handler)) + self.handlers = handlers + self._is_valid_conductors(conductors) # If conductors isn't a list, make it one if not type(conductors) == list: @@ -63,33 +96,13 @@ class MeowRunner: conductor.job_output_dir = job_output_dir conductor.job_queue_dir = job_queue_dir + # Create a channel from the conductor back to this runner + c_to_r_job_runner, c_to_r_job_conductor = Pipe(duplex=True) + + conductor.to_runner_job = c_to_r_job_conductor + self.job_connections.append((c_to_r_job_runner, conductor)) self.conductors = conductors - self._is_valid_handlers(handlers) - # If handlers isn't a list, make it one - if not type(handlers) == list: - handlers = [handlers] - self.from_handlers = [] - for handler in handlers: - # Create a channel from the handler back to this runner - handler_to_runner_reader, handler_to_runner_writer = Pipe() - handler.to_runner = handler_to_runner_writer - handler.job_queue_dir = job_queue_dir - self.from_handlers.append(handler_to_runner_reader) - self.handlers = handlers - - self._is_valid_monitors(monitors) - # If monitors isn't a list, make it one - if not type(monitors) == list: - monitors = [monitors] - self.monitors = monitors - self.from_monitors = [] - for monitor in self.monitors: - # Create a channel from the monitor back to this runner - monitor_to_runner_reader, monitor_to_runner_writer = Pipe() - monitor.to_runner = monitor_to_runner_writer - self.from_monitors.append(monitor_to_runner_reader) - # Create channel to send stop messages to monitor/handler thread self._stop_mon_han_pipe = Pipe() self._mon_han_worker = None @@ -101,11 +114,16 @@ class MeowRunner: # Setup debugging self._print_target, self.debug_level = setup_debugging(print, logging) + # Setup queues + self.event_queue = [] + self.job_queue = [] + def run_monitor_handler_interaction(self)->None: """Function to be run in its own thread, to handle any inbound messages from monitors. These will be events, which should be matched to an appropriate handler and handled.""" - all_inputs = self.from_monitors + [self._stop_mon_han_pipe[0]] + all_inputs = [i[0] for i in self.event_connections] \ + + [self._stop_mon_han_pipe[0]] while True: ready = wait(all_inputs) @@ -113,56 +131,45 @@ class MeowRunner: if self._stop_mon_han_pipe[0] in ready: return else: - handled = False - for from_monitor in self.from_monitors: - if from_monitor in ready: - # Read event from the monitor channel - message = from_monitor.recv() - event = message + for connection, component in self.event_connections: + if connection not in ready: + continue + message = connection.recv() - valid_handlers = [] - for handler in self.handlers: + # Recieved an event + if isinstance(component, BaseMonitor): + self.event_queue.append(message) + continue + # Recieved a request for an event + if isinstance(component, BaseHandler): + valid = False + for event in self.event_queue: try: - valid, _ = handler.valid_handle_criteria(event) - if valid: - valid_handlers.append(handler) + valid, _ = component.valid_handle_criteria(event) except Exception as e: print_debug( self._print_target, self.debug_level, - "Could not determine validity of event " - f"for handler. {e}", + "Could not determine validity of " + f"event for handler {component.name}. {e}", DEBUG_INFO ) - - # If we've only one handler, use that - if len(valid_handlers) == 1: - handler = valid_handlers[0] - handled = True - self.handle_event(handler, event) - break - # If multiple handlers then randomly pick one - elif len(valid_handlers) > 1: - handler = valid_handlers[ - randrange(len(valid_handlers)) - ] - handled = True - self.handle_event(handler, event) - break - - if not handled: - print_debug( - self._print_target, - self.debug_level, - "Could not determine handler for event.", - DEBUG_INFO - ) + + if valid: + self.event_queue.remove(event) + connection.send(event) + break + + # If nothing valid then send a message + if not valid: + connection.send(1) def run_handler_conductor_interaction(self)->None: """Function to be run in its own thread, to handle any inbound messages from handlers. These will be jobs, which should be matched to an appropriate conductor and executed.""" - all_inputs = self.from_handlers + [self._stop_han_con_pipe[0]] + all_inputs = [i[0] for i in self.job_connections] \ + + [self._stop_han_con_pipe[0]] while True: ready = wait(all_inputs) @@ -170,102 +177,52 @@ class MeowRunner: if self._stop_han_con_pipe[0] in ready: return else: - executed = False - for from_handler in self.from_handlers: - if from_handler in ready: - # Read job directory from the handler channel - job_dir = from_handler.recv() - try: - metafile = os.path.join(job_dir, META_FILE) - job = threadsafe_read_status(metafile) - except Exception as e: - print_debug(self._print_target, self.debug_level, - "Could not load necessary job definitions for " - f"job at '{job_dir}'. {e}", DEBUG_INFO) - continue + for connection, component in self.job_connections: + if connection not in ready: + continue - valid_conductors = [] - for conductor in self.conductors: + message = connection.recv() + + # Recieved an event + if isinstance(component, BaseHandler): + self.job_queue.append(message) + continue + # Recieved a request for an event + if isinstance(component, BaseConductor): + valid = False + for job_dir in self.job_queue: try: - valid, _ = \ - conductor.valid_execute_criteria(job) - if valid: - valid_conductors.append(conductor) + metafile = os.path.join(job_dir, META_FILE) + job = threadsafe_read_status(metafile) except Exception as e: print_debug( self._print_target, self.debug_level, - "Could not determine validity of job " - f"for conductor. {e}", + "Could not load necessary job definitions " + f"for job at '{job_dir}'. {e}", DEBUG_INFO ) - # If we've only one conductor, use that - if len(valid_conductors) == 1: - conductor = valid_conductors[0] - executed = True - self.execute_job(conductor, job_dir) - break - # If multiple handlers then randomly pick one - elif len(valid_conductors) > 1: - conductor = valid_conductors[ - randrange(len(valid_conductors)) - ] - executed = True - self.execute_job(conductor, job_dir) - break - - # TODO determine something more useful to do here - if not executed: - print_debug( - self._print_target, - self.debug_level, - f"No conductor could be found for job {job_dir}", - DEBUG_INFO - ) - - def handle_event(self, handler:BaseHandler, event:Dict[str,Any])->None: - """Function for a given handler to handle a given event, without - crashing the runner in the event of a problem.""" - print_debug(self._print_target, self.debug_level, - f"Starting handling for {event[EVENT_TYPE]} event: " - f"'{event[EVENT_PATH]}'", DEBUG_INFO) - try: - handler.handle(event) - print_debug(self._print_target, self.debug_level, - f"Completed handling for {event[EVENT_TYPE]} event: " - f"'{event[EVENT_PATH]}'", DEBUG_INFO) - except Exception as e: - print_debug(self._print_target, self.debug_level, - f"Something went wrong during handling for {event[EVENT_TYPE]}" - f" event '{event[EVENT_PATH]}'. {e}", DEBUG_INFO) - - def execute_job(self, conductor:BaseConductor, job_dir:str)->None: - """Function for a given conductor to execute a given job, without - crashing the runner in the event of a problem.""" - job_id = os.path.basename(job_dir) - print_debug( - self._print_target, - self.debug_level, - f"Starting execution for job: '{job_id}'", - DEBUG_INFO - ) - try: - conductor.execute(job_dir) - print_debug( - self._print_target, - self.debug_level, - f"Completed execution for job: '{job_id}'", - DEBUG_INFO - ) - except Exception as e: - print_debug( - self._print_target, - self.debug_level, - f"Something went wrong in execution of job '{job_id}'. {e}", - DEBUG_INFO - ) + try: + valid, _ = component.valid_execute_criteria(job) + except Exception as e: + print_debug( + self._print_target, + self.debug_level, + "Could not determine validity of " + f"job for conductor {component.name}. {e}", + DEBUG_INFO + ) + + if valid: + self.job_queue.remove(job_dir) + connection.send(job_dir) + break + # If nothing valid then send a message + if not valid: + connection.send(1) + def start(self)->None: """Function to start the runner by starting all of the constituent monitors, handlers and conductors, along with managing interaction @@ -273,25 +230,14 @@ class MeowRunner: # Start all monitors for monitor in self.monitors: monitor.start() - startable = [] - # Start all handlers, if they need it + + # Start all handlers for handler in self.handlers: - try: - check_implementation(handler.start, BaseHandler) - if handler not in startable: - startable.append(handler) - except NotImplementedError: - pass - # Start all conductors, if they need it + handler.start() + + # Start all conductors for conductor in self.conductors: - try: - check_implementation(conductor.start, BaseConductor) - if conductor not in startable: - startable.append(conductor) - except NotImplementedError: - pass - for starting in startable: - starting.start() + conductor.start() # If we've not started the monitor/handler interaction thread yet, then # do so @@ -331,29 +277,18 @@ class MeowRunner: """Function to stop the runner by stopping all of the constituent monitors, handlers and conductors, along with managing interaction threads.""" + # Stop all the monitors for monitor in self.monitors: monitor.stop() - stopable = [] # Stop all handlers, if they need it for handler in self.handlers: - try: - check_implementation(handler.stop, BaseHandler) - if handler not in stopable: - stopable.append(handler) - except NotImplementedError: - pass + handler.stop() + # Stop all conductors, if they need it for conductor in self.conductors: - try: - check_implementation(conductor.stop, BaseConductor) - if conductor not in stopable: - stopable.append(conductor) - except NotImplementedError: - pass - for stopping in stopable: - stopping.stop() + conductor.stop() # If we've started the monitor/handler interaction thread, then stop it if self._mon_han_worker is None: diff --git a/functionality/validation.py b/functionality/validation.py index 34f0572..ad0bdcf 100644 --- a/functionality/validation.py +++ b/functionality/validation.py @@ -48,6 +48,13 @@ def check_type(variable:Any, expected_type:Type, alt_types:List[Type]=[], msg = f"Expected type(s) are '{type_list}', got {type(variable)}" raise TypeError(msg) +def check_types(variable:Any, expected_types:List[Type], or_none:bool=False, + hint:str="")->None: + """Checks if a given variable is one of the expected types. Raises + TypeError or ValueError as appropriate if any issues are encountered.""" + check_type(variable, expected_types[0], alt_types=expected_types[1:], + or_none=or_none, hint=hint) + def check_callable(call:Any, hint:str="")->None: """Checks if a given variable is a callable function. Raises TypeError if not.""" @@ -120,7 +127,6 @@ def valid_string(variable:str, valid_chars:str, min_length:int=1, hint:str="" f"are: {valid_chars}" raise ValueError(msg) - def valid_dict(variable:Dict[Any, Any], key_type:Type, value_type:Type, required_keys:List[Any]=[], optional_keys:List[Any]=[], strict:bool=True, min_length:int=1, hint:str="")->None: @@ -168,6 +174,54 @@ def valid_dict(variable:Dict[Any, Any], key_type:Type, value_type:Type, raise ValueError(f"Unexpected key '{k}' {hint}should not be " f"present in dict '{variable}'") +def valid_dict_multiple_types(variable:Dict[Any, Any], key_type:Type, + value_types:List[Type], required_keys:List[Any]=[], + optional_keys:List[Any]=[], strict:bool=True, min_length:int=1, + hint:str="")->None: + """Checks that a given dictionary is valid. Key and Value types are + enforced, as are required and optional keys. Will raise ValueError, + TypeError or KeyError depending on the problem encountered.""" + # Validate inputs + check_type(variable, Dict, hint=hint) + check_type(key_type, Type, alt_types=[_SpecialForm], hint=hint) + valid_list(value_types, Type, alt_types=[_SpecialForm], hint=hint) + check_type(required_keys, list, hint=hint) + check_type(optional_keys, list, hint=hint) + check_type(strict, bool, hint=hint) + + if hint: + hint = f"in '{hint}' " + + # Check dict meets minimum length + if len(variable) < min_length: + raise ValueError( + f"Dictionary '{variable}' {hint}is below minimum length of " + f"{min_length}" + ) + + # Check key and value types + for k, v in variable.items(): + if key_type != Any and not isinstance(k, key_type): + raise TypeError(f"Key {k} {hint}had unexpected type '{type(k)}' " + f"rather than expected '{key_type}' in dict '{variable}'") + if Any not in value_types and type(v) not in value_types: + raise TypeError(f"Value {v} {hint}had unexpected type '{type(v)}' " + f"rather than expected '{value_types}' in dict '{variable}'") + + # Check all required keys present + for rk in required_keys: + if rk not in variable.keys(): + raise KeyError(f"Missing required key '{rk}' from dict " + f"'{variable}' {hint}.") + + # If strict checking, enforce that only required and optional keys are + # present + if strict: + for k in variable.keys(): + if k not in required_keys and k not in optional_keys: + raise ValueError(f"Unexpected key '{k}' {hint}should not be " + f"present in dict '{variable}'") + def valid_list(variable:List[Any], entry_type:Type, alt_types:List[Type]=[], min_length:int=1, hint:str="")->None: """Checks that a given list is valid. Value types are checked and a @@ -302,4 +356,16 @@ def valid_non_existing_path(variable:str, allow_base:bool=False, hint:str="" msg = f"Route to requested path '{variable}' does not exist." raise ValueError(msg) +def valid_natural(num:int, hint:str="")->None: + """Check a given value is a natural number. Will raise a ValueError if not.""" + check_type(num, int, hint=hint) + + if num < 0: + if hint : + msg = f"Value {num} in {hint} is not a natural number." + else: + msg = f"Value {num} is not a natural number." + + raise ValueError(msg) + # TODO add validation for requirement functions \ No newline at end of file diff --git a/patterns/file_event_pattern.py b/patterns/file_event_pattern.py index b555920..73326ad 100644 --- a/patterns/file_event_pattern.py +++ b/patterns/file_event_pattern.py @@ -10,7 +10,6 @@ import threading import sys import os -from copy import deepcopy from fnmatch import translate from re import match from time import time, sleep @@ -31,7 +30,7 @@ from meow_base.core.vars import VALID_RECIPE_NAME_CHARS, \ DIR_RETROACTIVE_EVENT from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.hashing import get_hash -from meow_base.functionality.meow import create_rule, create_watchdog_event +from meow_base.functionality.meow import create_watchdog_event # Events that are monitored by default _DEFAULT_MASK = [ @@ -161,13 +160,6 @@ class WatchdogMonitor(BaseMonitor): debug_level:int # Where print messages are sent _print_target:Any - #A lock to solve race conditions on '_patterns' - _patterns_lock:threading.Lock - #A lock to solve race conditions on '_recipes' - _recipes_lock:threading.Lock - #A lock to solve race conditions on '_rules' - _rules_lock:threading.Lock - def __init__(self, base_dir:str, patterns:Dict[str,FileEventPattern], recipes:Dict[str,BaseRecipe], autostart=False, settletime:int=1, name:str="", print:Any=sys.stdout, logging:int=0)->None: @@ -180,9 +172,6 @@ class WatchdogMonitor(BaseMonitor): self.base_dir = base_dir check_type(settletime, int, hint="WatchdogMonitor.settletime") self._print_target, self.debug_level = setup_debugging(print, logging) - self._patterns_lock = threading.Lock() - self._recipes_lock = threading.Lock() - self._rules_lock = threading.Lock() self.event_handler = WatchdogEventHandler(self, settletime=settletime) self.monitor = Observer() self.monitor.schedule( @@ -256,7 +245,7 @@ class WatchdogMonitor(BaseMonitor): f"Event at {src_path} hit rule {rule.name}", DEBUG_INFO) # Send the event to the runner - self.send_to_runner(meow_event) + self.send_event_to_runner(meow_event) except Exception as e: self._rules_lock.release() @@ -264,248 +253,6 @@ class WatchdogMonitor(BaseMonitor): self._rules_lock.release() - def add_pattern(self, pattern:FileEventPattern)->None: - """Function to add a pattern to the current definitions. Any rules - that can be possibly created from that pattern will be automatically - created.""" - check_type(pattern, FileEventPattern, hint="add_pattern.pattern") - self._patterns_lock.acquire() - try: - if pattern.name in self._patterns: - raise KeyError(f"An entry for Pattern '{pattern.name}' " - "already exists. Do you intend to update instead?") - self._patterns[pattern.name] = pattern - except Exception as e: - self._patterns_lock.release() - raise e - self._patterns_lock.release() - - self._identify_new_rules(new_pattern=pattern) - - def update_pattern(self, pattern:FileEventPattern)->None: - """Function to update a pattern in the current definitions. Any rules - created from that pattern will be automatically updated.""" - check_type(pattern, FileEventPattern, hint="update_pattern.pattern") - self.remove_pattern(pattern.name) - self.add_pattern(pattern) - - def remove_pattern(self, pattern: Union[str,FileEventPattern])->None: - """Function to remove a pattern from the current definitions. Any rules - that will be no longer valid will be automatically removed.""" - check_type( - pattern, - str, - alt_types=[FileEventPattern], - hint="remove_pattern.pattern" - ) - lookup_key = pattern - if isinstance(lookup_key, FileEventPattern): - lookup_key = pattern.name - self._patterns_lock.acquire() - try: - if lookup_key not in self._patterns: - raise KeyError(f"Cannot remote Pattern '{lookup_key}' as it " - "does not already exist") - self._patterns.pop(lookup_key) - except Exception as e: - self._patterns_lock.release() - raise e - self._patterns_lock.release() - - if isinstance(pattern, FileEventPattern): - self._identify_lost_rules(lost_pattern=pattern.name) - else: - self._identify_lost_rules(lost_pattern=pattern) - - def get_patterns(self)->Dict[str,FileEventPattern]: - """Function to get a dict of the currently defined patterns of the - monitor. Note that the result is deep-copied, and so can be manipulated - without directly manipulating the internals of the monitor.""" - to_return = {} - self._patterns_lock.acquire() - try: - to_return = deepcopy(self._patterns) - except Exception as e: - self._patterns_lock.release() - raise e - self._patterns_lock.release() - return to_return - - def add_recipe(self, recipe: BaseRecipe)->None: - """Function to add a recipe to the current definitions. Any rules - that can be possibly created from that recipe will be automatically - created.""" - check_type(recipe, BaseRecipe, hint="add_recipe.recipe") - self._recipes_lock.acquire() - try: - if recipe.name in self._recipes: - raise KeyError(f"An entry for Recipe '{recipe.name}' already " - "exists. Do you intend to update instead?") - self._recipes[recipe.name] = recipe - except Exception as e: - self._recipes_lock.release() - raise e - self._recipes_lock.release() - - self._identify_new_rules(new_recipe=recipe) - - def update_recipe(self, recipe: BaseRecipe)->None: - """Function to update a recipe in the current definitions. Any rules - created from that recipe will be automatically updated.""" - check_type(recipe, BaseRecipe, hint="update_recipe.recipe") - self.remove_recipe(recipe.name) - self.add_recipe(recipe) - - def remove_recipe(self, recipe:Union[str,BaseRecipe])->None: - """Function to remove a recipe from the current definitions. Any rules - that will be no longer valid will be automatically removed.""" - check_type( - recipe, - str, - alt_types=[BaseRecipe], - hint="remove_recipe.recipe" - ) - lookup_key = recipe - if isinstance(lookup_key, BaseRecipe): - lookup_key = recipe.name - self._recipes_lock.acquire() - try: - # Check that recipe has not already been deleted - if lookup_key not in self._recipes: - raise KeyError(f"Cannot remote Recipe '{lookup_key}' as it " - "does not already exist") - self._recipes.pop(lookup_key) - except Exception as e: - self._recipes_lock.release() - raise e - self._recipes_lock.release() - - if isinstance(recipe, BaseRecipe): - self._identify_lost_rules(lost_recipe=recipe.name) - else: - self._identify_lost_rules(lost_recipe=recipe) - - def get_recipes(self)->Dict[str,BaseRecipe]: - """Function to get a dict of the currently defined recipes of the - monitor. Note that the result is deep-copied, and so can be manipulated - without directly manipulating the internals of the monitor.""" - to_return = {} - self._recipes_lock.acquire() - try: - to_return = deepcopy(self._recipes) - except Exception as e: - self._recipes_lock.release() - raise e - self._recipes_lock.release() - return to_return - - def get_rules(self)->Dict[str,Rule]: - """Function to get a dict of the currently defined rules of the - monitor. Note that the result is deep-copied, and so can be manipulated - without directly manipulating the internals of the monitor.""" - to_return = {} - self._rules_lock.acquire() - try: - to_return = deepcopy(self._rules) - except Exception as e: - self._rules_lock.release() - raise e - self._rules_lock.release() - return to_return - - def _identify_new_rules(self, new_pattern:FileEventPattern=None, - new_recipe:BaseRecipe=None)->None: - """Function to determine if a new rule can be created given a new - pattern or recipe, in light of other existing patterns or recipes in - the monitor.""" - - if new_pattern: - self._patterns_lock.acquire() - self._recipes_lock.acquire() - try: - # Check in case pattern has been deleted since function called - if new_pattern.name not in self._patterns: - self._patterns_lock.release() - self._recipes_lock.release() - return - # If pattern specifies recipe that already exists, make a rule - if new_pattern.recipe in self._recipes: - self._create_new_rule( - new_pattern, - self._recipes[new_pattern.recipe], - ) - except Exception as e: - self._patterns_lock.release() - self._recipes_lock.release() - raise e - self._patterns_lock.release() - self._recipes_lock.release() - - if new_recipe: - self._patterns_lock.acquire() - self._recipes_lock.acquire() - try: - # Check in case recipe has been deleted since function called - if new_recipe.name not in self._recipes: - self._patterns_lock.release() - self._recipes_lock.release() - return - # If recipe is specified by existing pattern, make a rule - for pattern in self._patterns.values(): - if pattern.recipe == new_recipe.name: - self._create_new_rule( - pattern, - new_recipe, - ) - except Exception as e: - self._patterns_lock.release() - self._recipes_lock.release() - raise e - self._patterns_lock.release() - self._recipes_lock.release() - - def _identify_lost_rules(self, lost_pattern:str=None, - lost_recipe:str=None)->None: - """Function to remove rules that should be deleted in response to a - pattern or recipe having been deleted.""" - to_delete = [] - self._rules_lock.acquire() - try: - # Identify any offending rules - for name, rule in self._rules.items(): - if lost_pattern and rule.pattern.name == lost_pattern: - to_delete.append(name) - if lost_recipe and rule.recipe.name == lost_recipe: - to_delete.append(name) - # Now delete them - for delete in to_delete: - if delete in self._rules.keys(): - self._rules.pop(delete) - except Exception as e: - self._rules_lock.release() - raise e - self._rules_lock.release() - - def _create_new_rule(self, pattern:FileEventPattern, - recipe:BaseRecipe)->None: - """Function to create a new rule from a given pattern and recipe. This - will only be called to create rules at runtime, as rules are - automatically created at initialisation using the same 'create_rule' - function called here.""" - rule = create_rule(pattern, recipe) - self._rules_lock.acquire() - try: - if rule.name in self._rules: - raise KeyError("Cannot create Rule with name of " - f"'{rule.name}' as already in use") - self._rules[rule.name] = rule - except Exception as e: - self._rules_lock.release() - raise e - self._rules_lock.release() - - self._apply_retroactive_rule(rule) - def _is_valid_base_dir(self, base_dir:str)->None: """Validation check for 'base_dir' variable from main constructor. Is automatically called during initialisation.""" @@ -521,11 +268,11 @@ class WatchdogMonitor(BaseMonitor): automatically called during initialisation.""" valid_dict(recipes, str, BaseRecipe, min_length=0, strict=False) - def _apply_retroactive_rules(self)->None: - """Function to determine if any rules should be applied to the existing - file structure, were the file structure created/modified now.""" - for rule in self._rules.values(): - self._apply_retroactive_rule(rule) + def _get_valid_pattern_types(self)->List[type]: + return [FileEventPattern] + + def _get_valid_recipe_types(self)->List[type]: + return [BaseRecipe] def _apply_retroactive_rule(self, rule:Rule)->None: """Function to determine if a rule should be applied to the existing @@ -559,13 +306,19 @@ class WatchdogMonitor(BaseMonitor): f"Retroactive event for file at at {globble} hit rule " f"{rule.name}", DEBUG_INFO) # Send it to the runner - self.send_to_runner(meow_event) + self.send_event_to_runner(meow_event) except Exception as e: self._rules_lock.release() raise e self._rules_lock.release() + def _apply_retroactive_rules(self)->None: + """Function to determine if any rules should be applied to the existing + file structure, were the file structure created/modified now.""" + for rule in self._rules.values(): + self._apply_retroactive_rule(rule) + class WatchdogEventHandler(PatternMatchingEventHandler): # The monitor class running this handler diff --git a/recipes/bash_recipe.py b/recipes/bash_recipe.py index 9086e73..68bdeba 100644 --- a/recipes/bash_recipe.py +++ b/recipes/bash_recipe.py @@ -65,13 +65,13 @@ class BashHandler(BaseHandler): # Where print messages are sent _print_target:Any def __init__(self, job_queue_dir:str=DEFAULT_JOB_QUEUE_DIR, name:str="", - print:Any=sys.stdout, logging:int=0)->None: + print:Any=sys.stdout, logging:int=0, pause_time:int=5)->None: """BashHandler Constructor. This creates jobs to be executed as bash scripts. This does not run as a continuous thread to handle execution, but is invoked according to a factory pattern using the handle function. Note that if this handler is given to a MeowRunner object, the job_queue_dir will be overwridden by its""" - super().__init__(name=name) + super().__init__(name=name, pause_time=pause_time) self._is_valid_job_queue_dir(job_queue_dir) self.job_queue_dir = job_queue_dir self._print_target, self.debug_level = setup_debugging(print, logging) @@ -180,7 +180,7 @@ class BashHandler(BaseHandler): threadsafe_write_status(meow_job, meta_file) # Send job directory, as actual definitons will be read from within it - self.send_to_runner(job_dir) + self.send_job_to_runner(job_dir) def assemble_bash_job_script()->List[str]: diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index cf4e368..5d53f44 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -71,13 +71,13 @@ class PapermillHandler(BaseHandler): # Where print messages are sent _print_target:Any def __init__(self, job_queue_dir:str=DEFAULT_JOB_QUEUE_DIR, name:str="", - print:Any=sys.stdout, logging:int=0)->None: + print:Any=sys.stdout, logging:int=0, pause_time:int=5)->None: """PapermillHandler Constructor. This creats jobs to be executed using the papermill module. This does not run as a continuous thread to handle execution, but is invoked according to a factory pattern using the handle function. Note that if this handler is given to a MeowRunner object, the job_queue_dir will be overwridden.""" - super().__init__(name=name) + super().__init__(name=name, pause_time=pause_time) self._is_valid_job_queue_dir(job_queue_dir) self.job_queue_dir = job_queue_dir self._print_target, self.debug_level = setup_debugging(print, logging) @@ -185,7 +185,7 @@ class PapermillHandler(BaseHandler): ) # Send job directory, as actual definitons will be read from within it - self.send_to_runner(job_dir) + self.send_job_to_runner(job_dir) def get_recipe_from_notebook(name:str, notebook_filename:str, parameters:Dict[str,Any]={}, requirements:Dict[str,Any]={} diff --git a/recipes/python_recipe.py b/recipes/python_recipe.py index e4c3269..4db1dbf 100644 --- a/recipes/python_recipe.py +++ b/recipes/python_recipe.py @@ -61,13 +61,13 @@ class PythonHandler(BaseHandler): # Where print messages are sent _print_target:Any def __init__(self, job_queue_dir:str=DEFAULT_JOB_QUEUE_DIR, name:str="", - print:Any=sys.stdout, logging:int=0)->None: + print:Any=sys.stdout, logging:int=0, pause_time:int=5)->None: """PythonHandler Constructor. This creates jobs to be executed as python functions. This does not run as a continuous thread to handle execution, but is invoked according to a factory pattern using the handle function. Note that if this handler is given to a MeowRunner object, the job_queue_dir will be overwridden by its""" - super().__init__(name=name) + super().__init__(name=name, pause_time=pause_time) self._is_valid_job_queue_dir(job_queue_dir) self.job_queue_dir = job_queue_dir self._print_target, self.debug_level = setup_debugging(print, logging) @@ -173,7 +173,7 @@ class PythonHandler(BaseHandler): ) # Send job directory, as actual definitons will be read from within it - self.send_to_runner(job_dir) + self.send_job_to_runner(job_dir) # Papermill job execution code, to be run within the conductor diff --git a/tests/test_base.py b/tests/test_base.py index e8b0f5f..e2304cd 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -1,7 +1,7 @@ import unittest -from typing import Any, Union, Tuple, Dict +from typing import Any, Union, Tuple, Dict, List from meow_base.core.base_conductor import BaseConductor from meow_base.core.base_handler import BaseHandler @@ -146,6 +146,7 @@ class BasePatternTests(unittest.TestCase): self.assertEqual(len(values), 0) +# TODO test for base functions class BaseMonitorTests(unittest.TestCase): def setUp(self)->None: super().setUp() @@ -171,32 +172,15 @@ class BaseMonitorTests(unittest.TestCase): pass def stop(self): pass - def _is_valid_patterns(self, patterns:Dict[str,BasePattern])->None: - pass - def _is_valid_recipes(self, recipes:Dict[str,BaseRecipe])->None: - pass - def add_pattern(self, pattern:BasePattern)->None: - pass - def update_pattern(self, pattern:BasePattern)->None: - pass - def remove_pattern(self, pattern:Union[str,BasePattern])->None: - pass - def get_patterns(self)->None: - pass - def add_recipe(self, recipe:BaseRecipe)->None: - pass - def update_recipe(self, recipe:BaseRecipe)->None: - pass - def remove_recipe(self, recipe:Union[str,BaseRecipe])->None: - pass - def get_recipes(self)->None: - pass - def get_rules(self)->None: - pass + def _get_valid_pattern_types(self)->List[type]: + return [BasePattern] + def _get_valid_recipe_types(self)->List[type]: + return [BaseRecipe] FullTestMonitor({}, {}) +# TODO test for base functions class BaseHandleTests(unittest.TestCase): def setUp(self)->None: super().setUp() @@ -220,12 +204,6 @@ class BaseHandleTests(unittest.TestCase): class FullTestHandler(BaseHandler): def handle(self, event): pass - def start(self): - pass - def stop(self): - pass - def _is_valid_inputs(self, inputs:Any)->None: - pass def valid_handle_criteria(self, event:Dict[str,Any] )->Tuple[bool,str]: pass @@ -233,6 +211,7 @@ class BaseHandleTests(unittest.TestCase): FullTestHandler() +# TODO test for base functions class BaseConductorTests(unittest.TestCase): def setUp(self)->None: super().setUp() diff --git a/tests/test_patterns.py b/tests/test_patterns.py index 7788f58..4044838 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -227,7 +227,7 @@ class WatchdogMonitorTests(unittest.TestCase): } wm = WatchdogMonitor(TEST_MONITOR_BASE, patterns, recipes) - wm.to_runner = from_monitor_writer + wm.to_runner_event = from_monitor_writer rules = wm.get_rules() @@ -291,7 +291,7 @@ class WatchdogMonitorTests(unittest.TestCase): rule = rules[list(rules.keys())[0]] from_monitor_reader, from_monitor_writer = Pipe() - wm.to_runner = from_monitor_writer + wm.to_runner_event = from_monitor_writer wm.start() @@ -356,7 +356,7 @@ class WatchdogMonitorTests(unittest.TestCase): rule = rules[list(rules.keys())[0]] from_monitor_reader, from_monitor_writer = Pipe() - wm.to_runner = from_monitor_writer + wm.to_runner_event = from_monitor_writer wm.start() @@ -437,7 +437,7 @@ class WatchdogMonitorTests(unittest.TestCase): rule = rules[list(rules.keys())[0]] from_monitor_reader, from_monitor_writer = Pipe() - wm.to_runner = from_monitor_writer + wm.to_runner_event = from_monitor_writer wm.start() @@ -508,7 +508,7 @@ class WatchdogMonitorTests(unittest.TestCase): rule = rules[list(rules.keys())[0]] from_monitor_reader, from_monitor_writer = Pipe() - wm.to_runner = from_monitor_writer + wm.to_runner_event = from_monitor_writer wm.start() diff --git a/tests/test_recipes.py b/tests/test_recipes.py index f09dbe1..141d924 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -142,9 +142,9 @@ class PapermillHandlerTests(unittest.TestCase): # Test PapermillHandler will handle given events def testPapermillHandlerHandling(self)->None: - from_handler_reader, from_handler_writer = Pipe() + from_handler_to_job_reader, from_handler_to_job_writer = Pipe() ph = PapermillHandler(job_queue_dir=TEST_JOB_QUEUE) - ph.to_runner = from_handler_writer + ph.to_runner_job = from_handler_to_job_writer with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") @@ -180,8 +180,8 @@ class PapermillHandlerTests(unittest.TestCase): ph.handle(event) - if from_handler_reader.poll(3): - job_dir = from_handler_reader.recv() + if from_handler_to_job_reader.poll(3): + job_dir = from_handler_to_job_reader.recv() self.assertIsInstance(job_dir, str) self.assertTrue(os.path.exists(job_dir)) @@ -191,9 +191,9 @@ class PapermillHandlerTests(unittest.TestCase): # Test PapermillHandler will create enough jobs from single sweep def testPapermillHandlerHandlingSingleSweep(self)->None: - from_handler_reader, from_handler_writer = Pipe() + from_handler_to_job_reader, from_handler_to_job_writer = Pipe() ph = PapermillHandler(job_queue_dir=TEST_JOB_QUEUE) - ph.to_runner = from_handler_writer + ph.to_runner_job = from_handler_to_job_writer with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") @@ -234,8 +234,8 @@ class PapermillHandlerTests(unittest.TestCase): jobs = [] recieving = True while recieving: - if from_handler_reader.poll(3): - jobs.append(from_handler_reader.recv()) + if from_handler_to_job_reader.poll(3): + jobs.append(from_handler_to_job_reader.recv()) else: recieving = False @@ -256,9 +256,9 @@ class PapermillHandlerTests(unittest.TestCase): # Test PapermillHandler will create enough jobs from multiple sweeps def testPapermillHandlerHandlingMultipleSweep(self)->None: - from_handler_reader, from_handler_writer = Pipe() + from_handler_to_job_reader, from_handler_to_job_writer = Pipe() ph = PapermillHandler(job_queue_dir=TEST_JOB_QUEUE) - ph.to_runner = from_handler_writer + ph.to_runner_job = from_handler_to_job_writer with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") @@ -304,8 +304,8 @@ class PapermillHandlerTests(unittest.TestCase): jobs = [] recieving = True while recieving: - if from_handler_reader.poll(3): - jobs.append(from_handler_reader.recv()) + if from_handler_to_job_reader.poll(3): + jobs.append(from_handler_to_job_reader.recv()) else: recieving = False @@ -477,6 +477,90 @@ class PapermillHandlerTests(unittest.TestCase): self.assertEqual(recipe.name, "name") self.assertEqual(recipe.recipe, COMPLETE_NOTEBOOK) + # Test handler starts and stops appropriatly + def testPapermillHandlerStartStop(self)->None: + ph = PapermillHandler(job_queue_dir=TEST_JOB_QUEUE) + from_handler_to_event_reader, from_handler_to_event_writer = Pipe() + ph.to_runner_event = from_handler_to_event_writer + + with self.assertRaises(AttributeError): + self.assertFalse(ph._handle_thread.is_alive()) + + ph.start() + if from_handler_to_event_reader.poll(3): + msg = from_handler_to_event_reader.recv() + + self.assertTrue(ph._handle_thread.is_alive()) + self.assertEqual(msg, 1) + + ph.stop() + + self.assertFalse(ph._handle_thread.is_alive()) + + # Test handler handles given events + def testPapermillHandlerOngoingHandling(self)->None: + ph = PapermillHandler(job_queue_dir=TEST_JOB_QUEUE) + handler_to_event_us, handler_to_event_them = Pipe(duplex=True) + handler_to_job_us, handler_to_job_them = Pipe() + ph.to_runner_event = handler_to_event_them + ph.to_runner_job = handler_to_job_them + + with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: + f.write("Data") + + pattern_one = FileEventPattern( + "pattern_one", "A", "recipe_one", "file_one") + recipe = JupyterNotebookRecipe( + "recipe_one", COMPLETE_NOTEBOOK) + + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, + } + + rules = create_rules(patterns, recipes) + self.assertEqual(len(rules), 1) + _, rule = rules.popitem() + self.assertIsInstance(rule, Rule) + + event = { + EVENT_TYPE: EVENT_TYPE_WATCHDOG, + EVENT_PATH: os.path.join(TEST_MONITOR_BASE, "A"), + WATCHDOG_BASE: TEST_MONITOR_BASE, + EVENT_RULE: rule, + WATCHDOG_HASH: get_hash( + os.path.join(TEST_MONITOR_BASE, "A"), SHA256 + ) + } + + with self.assertRaises(AttributeError): + self.assertFalse(ph._handle_thread.is_alive()) + + ph.start() + if handler_to_event_us.poll(3): + msg = handler_to_event_us.recv() + self.assertEqual(msg, 1) + + handler_to_event_us.send(event) + + if handler_to_job_us.poll(3): + job_dir = handler_to_job_us.recv() + + if handler_to_event_us.poll(3): + msg = handler_to_event_us.recv() + self.assertEqual(msg, 1) + + ph.stop() + + self.assertIsInstance(job_dir, str) + self.assertTrue(os.path.exists(job_dir)) + + job = read_yaml(os.path.join(job_dir, META_FILE)) + valid_job(job) + + class PythonTests(unittest.TestCase): def setUp(self)->None: super().setUp() @@ -560,9 +644,9 @@ class PythonHandlerTests(unittest.TestCase): # Test PythonHandler will handle given events def testPythonHandlerHandling(self)->None: - from_handler_reader, from_handler_writer = Pipe() + from_handler_to_job_reader, from_handler_to_job_writer = Pipe() ph = PythonHandler(job_queue_dir=TEST_JOB_QUEUE) - ph.to_runner = from_handler_writer + ph.to_runner_job = from_handler_to_job_writer with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") @@ -598,8 +682,8 @@ class PythonHandlerTests(unittest.TestCase): ph.handle(event) - if from_handler_reader.poll(3): - job_dir = from_handler_reader.recv() + if from_handler_to_job_reader.poll(3): + job_dir = from_handler_to_job_reader.recv() self.assertIsInstance(job_dir, str) self.assertTrue(os.path.exists(job_dir)) @@ -609,9 +693,9 @@ class PythonHandlerTests(unittest.TestCase): # Test PythonHandler will create enough jobs from single sweep def testPythonHandlerHandlingSingleSweep(self)->None: - from_handler_reader, from_handler_writer = Pipe() + from_handler_to_job_reader, from_handler_to_job_writer = Pipe() ph = PythonHandler(job_queue_dir=TEST_JOB_QUEUE) - ph.to_runner = from_handler_writer + ph.to_runner_job = from_handler_to_job_writer with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") @@ -652,8 +736,8 @@ class PythonHandlerTests(unittest.TestCase): jobs = [] recieving = True while recieving: - if from_handler_reader.poll(3): - jobs.append(from_handler_reader.recv()) + if from_handler_to_job_reader.poll(3): + jobs.append(from_handler_to_job_reader.recv()) else: recieving = False @@ -674,9 +758,9 @@ class PythonHandlerTests(unittest.TestCase): # Test PythonHandler will create enough jobs from multiple sweeps def testPythonHandlerHandlingMultipleSweep(self)->None: - from_handler_reader, from_handler_writer = Pipe() + from_handler_to_job_reader, from_handler_to_job_writer = Pipe() ph = PythonHandler(job_queue_dir=TEST_JOB_QUEUE) - ph.to_runner = from_handler_writer + ph.to_runner_job = from_handler_to_job_writer with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") @@ -722,8 +806,8 @@ class PythonHandlerTests(unittest.TestCase): jobs = [] recieving = True while recieving: - if from_handler_reader.poll(3): - jobs.append(from_handler_reader.recv()) + if from_handler_to_job_reader.poll(3): + jobs.append(from_handler_to_job_reader.recv()) else: recieving = False @@ -840,7 +924,6 @@ class PythonHandlerTests(unittest.TestCase): self.assertEqual(result, "124937.5") - # Test jobFunc doesn't execute with no args def testJobFuncBadArgs(self)->None: try: @@ -890,6 +973,90 @@ class PythonHandlerTests(unittest.TestCase): }) self.assertTrue(status) + # Test handler starts and stops appropriatly + def testPythonHandlerStartStop(self)->None: + ph = PythonHandler(job_queue_dir=TEST_JOB_QUEUE) + from_handler_to_event_reader, from_handler_to_event_writer = Pipe() + ph.to_runner_event = from_handler_to_event_writer + + with self.assertRaises(AttributeError): + self.assertFalse(ph._handle_thread.is_alive()) + + ph.start() + if from_handler_to_event_reader.poll(3): + msg = from_handler_to_event_reader.recv() + + self.assertTrue(ph._handle_thread.is_alive()) + self.assertEqual(msg, 1) + + ph.stop() + + self.assertFalse(ph._handle_thread.is_alive()) + + # Test handler handles given events + def testPythonHandlerOngoingHandling(self)->None: + ph = PythonHandler(job_queue_dir=TEST_JOB_QUEUE) + handler_to_event_us, handler_to_event_them = Pipe(duplex=True) + handler_to_job_us, handler_to_job_them = Pipe() + ph.to_runner_event = handler_to_event_them + ph.to_runner_job = handler_to_job_them + + with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: + f.write("Data") + + pattern_one = FileEventPattern( + "pattern_one", "A", "recipe_one", "file_one") + recipe = PythonRecipe( + "recipe_one", COMPLETE_PYTHON_SCRIPT) + + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, + } + + rules = create_rules(patterns, recipes) + self.assertEqual(len(rules), 1) + _, rule = rules.popitem() + self.assertIsInstance(rule, Rule) + + event = { + EVENT_TYPE: EVENT_TYPE_WATCHDOG, + EVENT_PATH: os.path.join(TEST_MONITOR_BASE, "A"), + WATCHDOG_BASE: TEST_MONITOR_BASE, + EVENT_RULE: rule, + WATCHDOG_HASH: get_hash( + os.path.join(TEST_MONITOR_BASE, "A"), SHA256 + ) + } + + with self.assertRaises(AttributeError): + self.assertFalse(ph._handle_thread.is_alive()) + + ph.start() + if handler_to_event_us.poll(3): + msg = handler_to_event_us.recv() + self.assertEqual(msg, 1) + + handler_to_event_us.send(event) + + if handler_to_job_us.poll(3): + job_dir = handler_to_job_us.recv() + + if handler_to_event_us.poll(3): + msg = handler_to_event_us.recv() + self.assertEqual(msg, 1) + + ph.stop() + + self.assertIsInstance(job_dir, str) + self.assertTrue(os.path.exists(job_dir)) + + job = read_yaml(os.path.join(job_dir, META_FILE)) + valid_job(job) + + class BashTests(unittest.TestCase): def setUp(self)->None: super().setUp() @@ -973,9 +1140,9 @@ class BashHandlerTests(unittest.TestCase): # Test BashHandler will handle given events def testBashHandlerHandling(self)->None: - from_handler_reader, from_handler_writer = Pipe() + from_handler_to_job_reader, from_handler_to_job_writer = Pipe() ph = BashHandler(job_queue_dir=TEST_JOB_QUEUE) - ph.to_runner = from_handler_writer + ph.to_runner_job = from_handler_to_job_writer with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") @@ -1011,8 +1178,8 @@ class BashHandlerTests(unittest.TestCase): ph.handle(event) - if from_handler_reader.poll(3): - job_dir = from_handler_reader.recv() + if from_handler_to_job_reader.poll(3): + job_dir = from_handler_to_job_reader.recv() self.assertIsInstance(job_dir, str) self.assertTrue(os.path.exists(job_dir)) @@ -1022,9 +1189,9 @@ class BashHandlerTests(unittest.TestCase): # Test BashHandler will create enough jobs from single sweep def testBashHandlerHandlingSingleSweep(self)->None: - from_handler_reader, from_handler_writer = Pipe() + from_handler_to_job_reader, from_handler_to_job_writer = Pipe() ph = BashHandler(job_queue_dir=TEST_JOB_QUEUE) - ph.to_runner = from_handler_writer + ph.to_runner_job = from_handler_to_job_writer with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") @@ -1065,8 +1232,8 @@ class BashHandlerTests(unittest.TestCase): jobs = [] recieving = True while recieving: - if from_handler_reader.poll(3): - jobs.append(from_handler_reader.recv()) + if from_handler_to_job_reader.poll(3): + jobs.append(from_handler_to_job_reader.recv()) else: recieving = False @@ -1087,9 +1254,9 @@ class BashHandlerTests(unittest.TestCase): # Test BashHandler will create enough jobs from multiple sweeps def testBashHandlerHandlingMultipleSweep(self)->None: - from_handler_reader, from_handler_writer = Pipe() + from_handler_to_job_reader, from_handler_to_job_writer = Pipe() ph = BashHandler(job_queue_dir=TEST_JOB_QUEUE) - ph.to_runner = from_handler_writer + ph.to_runner_job = from_handler_to_job_writer with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") @@ -1135,8 +1302,8 @@ class BashHandlerTests(unittest.TestCase): jobs = [] recieving = True while recieving: - if from_handler_reader.poll(3): - jobs.append(from_handler_reader.recv()) + if from_handler_to_job_reader.poll(3): + jobs.append(from_handler_to_job_reader.recv()) else: recieving = False @@ -1299,3 +1466,86 @@ class BashHandlerTests(unittest.TestCase): EVENT_RULE: rule }) self.assertTrue(status) + + # Test handler starts and stops appropriatly + def testBashHandlerStartStop(self)->None: + ph = BashHandler(job_queue_dir=TEST_JOB_QUEUE) + from_handler_to_event_reader, from_handler_to_event_writer = Pipe() + ph.to_runner_event = from_handler_to_event_writer + + with self.assertRaises(AttributeError): + self.assertFalse(ph._handle_thread.is_alive()) + + ph.start() + if from_handler_to_event_reader.poll(3): + msg = from_handler_to_event_reader.recv() + + self.assertTrue(ph._handle_thread.is_alive()) + self.assertEqual(msg, 1) + + ph.stop() + + self.assertFalse(ph._handle_thread.is_alive()) + + # Test handler handles given events + def testBashHandlerOngoingHandling(self)->None: + ph = BashHandler(job_queue_dir=TEST_JOB_QUEUE) + handler_to_event_us, handler_to_event_them = Pipe(duplex=True) + handler_to_job_us, handler_to_job_them = Pipe() + ph.to_runner_event = handler_to_event_them + ph.to_runner_job = handler_to_job_them + + with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: + f.write("Data") + + pattern_one = FileEventPattern( + "pattern_one", "A", "recipe_one", "file_one") + recipe = BashRecipe( + "recipe_one", COMPLETE_BASH_SCRIPT) + + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, + } + + rules = create_rules(patterns, recipes) + self.assertEqual(len(rules), 1) + _, rule = rules.popitem() + self.assertIsInstance(rule, Rule) + + event = { + EVENT_TYPE: EVENT_TYPE_WATCHDOG, + EVENT_PATH: os.path.join(TEST_MONITOR_BASE, "A"), + WATCHDOG_BASE: TEST_MONITOR_BASE, + EVENT_RULE: rule, + WATCHDOG_HASH: get_hash( + os.path.join(TEST_MONITOR_BASE, "A"), SHA256 + ) + } + + with self.assertRaises(AttributeError): + self.assertFalse(ph._handle_thread.is_alive()) + + ph.start() + if handler_to_event_us.poll(3): + msg = handler_to_event_us.recv() + self.assertEqual(msg, 1) + + handler_to_event_us.send(event) + + if handler_to_job_us.poll(3): + job_dir = handler_to_job_us.recv() + + if handler_to_event_us.poll(3): + msg = handler_to_event_us.recv() + self.assertEqual(msg, 1) + + ph.stop() + + self.assertIsInstance(job_dir, str) + self.assertTrue(os.path.exists(job_dir)) + + job = read_yaml(os.path.join(job_dir, META_FILE)) + valid_job(job) diff --git a/tests/test_runner.py b/tests/test_runner.py index 2fe2857..3ca3a25 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -4,6 +4,7 @@ import importlib import os import unittest +from multiprocessing import Pipe from random import shuffle from shutil import copy from time import sleep @@ -134,12 +135,12 @@ class MeowTests(unittest.TestCase): monitor_two = WatchdogMonitor(TEST_MONITOR_BASE, {}, {}) monitors = [ monitor_one, monitor_two ] - handler_one = PapermillHandler() - handler_two = PapermillHandler() + handler_one = PapermillHandler(pause_time=0) + handler_two = PapermillHandler(pause_time=0) handlers = [ handler_one, handler_two ] - conductor_one = LocalPythonConductor() - conductor_two = LocalPythonConductor() + conductor_one = LocalPythonConductor(pause_time=0) + conductor_two = LocalPythonConductor(pause_time=0) conductors = [ conductor_one, conductor_two ] runner = MeowRunner(monitor_one, handler_one, conductor_one) @@ -150,31 +151,59 @@ class MeowTests(unittest.TestCase): self.assertEqual(len(runner.monitors), 1) self.assertEqual(runner.monitors[0], monitor_one) - self.assertIsInstance(runner.from_monitors, list) - self.assertEqual(len(runner.from_monitors), 1) - runner.monitors[0].send_to_runner("monitor test message") - message = None - if runner.from_monitors[0].poll(3): - message = runner.from_monitors[0].recv() - self.assertIsNotNone(message) - self.assertEqual(message, "monitor test message") - self.assertIsInstance(runner.handlers, list) for handler in runner.handlers: self.assertIsInstance(handler, BaseHandler) - - self.assertIsInstance(runner.from_handlers, list) - self.assertEqual(len(runner.from_handlers), 1) - runner.handlers[0].send_to_runner("handler test message") - message = None - if runner.from_handlers[0].poll(3): - message = runner.from_handlers[0].recv() - self.assertIsNotNone(message) - self.assertEqual(message, "handler test message") + self.assertEqual(len(runner.handlers), 1) + self.assertEqual(runner.handlers[0], handler_one) self.assertIsInstance(runner.conductors, list) for conductor in runner.conductors: self.assertIsInstance(conductor, BaseConductor) + self.assertEqual(len(runner.conductors), 1) + self.assertEqual(runner.conductors[0], conductor_one) + + self.assertIsInstance(runner.event_connections, list) + self.assertEqual(len(runner.event_connections), 2) + + runner.monitors[0].send_event_to_runner("monitor test message") + message = None + from_monitor = [i[0] for i in runner.event_connections \ + if isinstance(i[1], BaseMonitor)][0] + if from_monitor.poll(3): + message = from_monitor.recv() + self.assertIsNotNone(message) + self.assertEqual(message, "monitor test message") + + runner.handlers[0].prompt_runner_for_event() + message = None + from_handler = [i[0] for i in runner.event_connections \ + if isinstance(i[1], BaseHandler)][0] + if from_handler.poll(3): + message = from_handler.recv() + self.assertIsNotNone(message) + self.assertEqual(message, 1) + + self.assertIsInstance(runner.job_connections, list) + self.assertEqual(len(runner.job_connections), 2) + + runner.handlers[0].send_job_to_runner("handler test message") + message = None + from_handler = [i[0] for i in runner.job_connections \ + if isinstance(i[1], BaseHandler)][0] + if from_handler.poll(3): + message = from_handler.recv() + self.assertIsNotNone(message) + self.assertEqual(message, "handler test message") + + runner.conductors[0].prompt_runner_for_job() + message = None + from_conductor = [i[0] for i in runner.job_connections \ + if isinstance(i[1], BaseConductor)][0] + if from_conductor.poll(3): + message = from_conductor.recv() + self.assertIsNotNone(message) + self.assertEqual(message, 1) runner = MeowRunner(monitors, handlers, conductors) @@ -182,38 +211,69 @@ class MeowTests(unittest.TestCase): for m in runner.monitors: self.assertIsInstance(m, BaseMonitor) self.assertEqual(len(runner.monitors), len(monitors)) - self.assertIn(monitor_one, runner.monitors) - self.assertIn(monitor_two, runner.monitors) - - self.assertIsInstance(runner.from_monitors, list) - self.assertEqual(len(runner.from_monitors), len(monitors)) - for rm in runner.monitors: - rm.send_to_runner("monitor test message") - messages = [None] * len(monitors) - for i, rfm in enumerate(runner.from_monitors): - if rfm.poll(3): - messages[i] = rfm.recv() - for m in messages: - self.assertIsNotNone(m) - self.assertEqual(m, "monitor test message") + for monitor in monitors: + self.assertIn(monitor, monitors) self.assertIsInstance(runner.handlers, list) for handler in runner.handlers: self.assertIsInstance(handler, BaseHandler) + self.assertEqual(len(runner.handlers), 2) + for handler in handlers: + self.assertIn(handler, handlers) - self.assertIsInstance(runner.from_handlers, list) - self.assertEqual(len(runner.from_handlers), len(handlers)) - for rh in runner.handlers: - rh.send_to_runner("handler test message") - message = None - if runner.from_handlers[0].poll(3): - message = runner.from_handlers[0].recv() - self.assertIsNotNone(message) - self.assertEqual(message, "handler test message") - - self.assertIsInstance(runner.conductors, list) + self.assertIsInstance(runner.conductors, list) for conductor in runner.conductors: self.assertIsInstance(conductor, BaseConductor) + self.assertEqual(len(runner.conductors), len(conductors)) + for conductor in conductors: + self.assertIn(conductor, conductors) + + self.assertIsInstance(runner.event_connections, list) + self.assertEqual(len(runner.event_connections), 4) + + for monitor in monitors: + monitor.send_event_to_runner("monitor test message") + + message = None + from_monitor = [i[0] for i in runner.event_connections \ + if i[1] is monitor][0] + if from_monitor.poll(3): + message = from_monitor.recv() + self.assertIsNotNone(message) + self.assertEqual(message, "monitor test message") + + for handler in handlers: + handler.prompt_runner_for_event() + message = None + from_handler = [i[0] for i in runner.event_connections \ + if i[1] is handler][0] + if from_handler.poll(3): + message = from_handler.recv() + self.assertIsNotNone(message) + self.assertEqual(message, 1) + + self.assertIsInstance(runner.job_connections, list) + self.assertEqual(len(runner.job_connections), 4) + + for handler in handlers: + handler.send_job_to_runner("handler test message") + message = None + from_handler = [i[0] for i in runner.job_connections \ + if i[1] is handler][0] + if from_handler.poll(3): + message = from_handler.recv() + self.assertIsNotNone(message) + self.assertEqual(message, "handler test message") + + for conductor in conductors: + conductor.prompt_runner_for_job() + message = None + from_conductor = [i[0] for i in runner.job_connections \ + if i[1] is conductor][0] + if from_conductor.poll(3): + message = from_conductor.recv() + self.assertIsNotNone(message) + self.assertEqual(message, 1) # Test meow runner directory overrides def testMeowRunnerDirOverridesSetup(self)->None: @@ -278,8 +338,6 @@ class MeowTests(unittest.TestCase): recipe.name: recipe, } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -288,13 +346,23 @@ class MeowTests(unittest.TestCase): settletime=1 ), PapermillHandler(), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 - ) - + job_output_dir=TEST_JOB_OUTPUT + ) + + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) + runner.start() start_dir = os.path.join(TEST_MONITOR_BASE, "start") @@ -303,32 +371,43 @@ class MeowTests(unittest.TestCase): with open(os.path.join(start_dir, "A.txt"), "w") as f: f.write("Initial Data") - self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) - loops = 0 - job_id = None while loops < 5: - sleep(1) - runner_debug_stream.seek(0) - messages = runner_debug_stream.readlines() + # Initial prompt + if conductor_to_test_test.poll(5): + msg = conductor_to_test_test.recv() + else: + raise Exception("Timed out") + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) + + # Reply + if test_to_runner_test.poll(5): + msg = test_to_runner_test.recv() + else: + raise Exception("Timed out") + job_dir = msg + conductor_to_test_test.send(msg) + + if isinstance(job_dir, str): + # Prompt again once complete + if conductor_to_test_test.poll(5): + msg = conductor_to_test_test.recv() + else: + raise Exception("Timed out") + self.assertEqual(msg, 1) + loops = 5 - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed execution for job: '" in msg: - job_id = msg.replace( - "INFO: Completed execution for job: '", "") - job_id = job_id[:-2] - loops = 5 loops += 1 - self.assertIsNotNone(job_id) + job_dir = job_dir.replace(TEST_JOB_QUEUE, TEST_JOB_OUTPUT) + + self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1) - self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT)) + self.assertTrue(os.path.exists(job_dir)) runner.stop() - job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) print(os.listdir(job_dir)) self.assertEqual(count_non_locks(job_dir), 5) @@ -372,8 +451,6 @@ class MeowTests(unittest.TestCase): recipe.name: recipe, } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -384,12 +461,22 @@ class MeowTests(unittest.TestCase): PapermillHandler( job_queue_dir=TEST_JOB_QUEUE, ), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 - ) + job_output_dir=TEST_JOB_OUTPUT + ) + + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) runner.start() @@ -403,30 +490,38 @@ class MeowTests(unittest.TestCase): loops = 0 job_ids = [] - while len(job_ids) < 2 and loops < 5: - sleep(1) - runner_debug_stream.seek(0) - messages = runner_debug_stream.readlines() + while loops < 15: + # Initial prompt + if conductor_to_test_test.poll(5): + msg = conductor_to_test_test.recv() + else: + raise Exception("Timed out") + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) + + # Reply + if test_to_runner_test.poll(5): + msg = test_to_runner_test.recv() + else: + raise Exception("Timed out") + conductor_to_test_test.send(msg) + + if len(job_ids) == 2: + break + + if isinstance(msg, str): + job_ids.append(msg.replace(TEST_JOB_QUEUE+os.path.sep, '')) - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed execution for job: '" in msg: - job_id = msg.replace( - "INFO: Completed execution for job: '", "") - job_id = job_id[:-2] - if job_id not in job_ids: - job_ids.append(job_id) loops += 1 + runner.stop() + self.assertEqual(len(job_ids), 2) self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 2) self.assertIn(job_ids[0], os.listdir(TEST_JOB_OUTPUT)) self.assertIn(job_ids[1], os.listdir(TEST_JOB_OUTPUT)) - runner.stop() - - mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) + mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_ids[0]) self.assertEqual(count_non_locks(mid_job_dir), 5) result = read_notebook( @@ -441,7 +536,7 @@ class MeowTests(unittest.TestCase): self.assertEqual(data, "Initial Data\nA line from Pattern 1") - final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) + final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_ids[1]) self.assertEqual(count_non_locks(final_job_dir), 5) result = read_notebook(os.path.join(final_job_dir, @@ -476,8 +571,6 @@ class MeowTests(unittest.TestCase): recipe.name: recipe, } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -488,12 +581,23 @@ class MeowTests(unittest.TestCase): PythonHandler( job_queue_dir=TEST_JOB_QUEUE ), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 - ) + job_output_dir=TEST_JOB_OUTPUT + ) + + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) + runner.start() @@ -506,30 +610,42 @@ class MeowTests(unittest.TestCase): self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) loops = 0 - job_id = None while loops < 5: - sleep(1) - runner_debug_stream.seek(0) - messages = runner_debug_stream.readlines() + # Initial prompt + if conductor_to_test_test.poll(5): + msg = conductor_to_test_test.recv() + else: + raise Exception("Timed out") + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) + + # Reply + if test_to_runner_test.poll(5): + msg = test_to_runner_test.recv() + else: + raise Exception("Timed out") + job_dir = msg + conductor_to_test_test.send(msg) + + if isinstance(job_dir, str): + # Prompt again once complete + if conductor_to_test_test.poll(5): + msg = conductor_to_test_test.recv() + else: + raise Exception("Timed out") + self.assertEqual(msg, 1) + loops = 5 - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed execution for job: '" in msg: - job_id = msg.replace( - "INFO: Completed execution for job: '", "") - job_id = job_id[:-2] - loops = 5 loops += 1 - self.assertIsNotNone(job_id) + job_dir = job_dir.replace(TEST_JOB_QUEUE, TEST_JOB_OUTPUT) + + self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1) - self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT)) + self.assertTrue(os.path.exists(job_dir)) runner.stop() - job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) - metafile = os.path.join(job_dir, META_FILE) status = read_yaml(metafile) @@ -578,8 +694,6 @@ class MeowTests(unittest.TestCase): recipe.name: recipe, } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -590,12 +704,22 @@ class MeowTests(unittest.TestCase): PythonHandler( job_queue_dir=TEST_JOB_QUEUE ), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 + job_output_dir=TEST_JOB_OUTPUT ) + + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) runner.start() @@ -607,22 +731,31 @@ class MeowTests(unittest.TestCase): self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) + loops = 0 job_ids = [] - while len(job_ids) < 2 and loops < 5: - sleep(1) - runner_debug_stream.seek(0) - messages = runner_debug_stream.readlines() + while loops < 15: + # Initial prompt + if conductor_to_test_test.poll(5): + msg = conductor_to_test_test.recv() + else: + raise Exception("Timed out") + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) + + # Reply + if test_to_runner_test.poll(5): + msg = test_to_runner_test.recv() + else: + raise Exception("Timed out") + conductor_to_test_test.send(msg) + + if len(job_ids) == 2: + break + + if isinstance(msg, str): + job_ids.append(msg.replace(TEST_JOB_QUEUE+os.path.sep, '')) - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed execution for job: '" in msg: - job_id = msg.replace( - "INFO: Completed execution for job: '", "") - job_id = job_id[:-2] - if job_id not in job_ids: - job_ids.append(job_id) loops += 1 runner.stop() @@ -704,8 +837,6 @@ class MeowTests(unittest.TestCase): recipe.name: recipe, } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -716,12 +847,22 @@ class MeowTests(unittest.TestCase): PythonHandler( job_queue_dir=TEST_JOB_QUEUE ), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 - ) + job_output_dir=TEST_JOB_OUTPUT + ) + + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) runner.start() @@ -735,21 +876,28 @@ class MeowTests(unittest.TestCase): loops = 0 job_ids = [] - while loops < 5: - sleep(1) - runner_debug_stream.seek(0) - messages = runner_debug_stream.readlines() + while loops < 60: + # Initial prompt + if conductor_to_test_test.poll(5): + msg = conductor_to_test_test.recv() + else: + raise Exception("Timed out") + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) + + # Reply + if test_to_runner_test.poll(5): + msg = test_to_runner_test.recv() + else: + raise Exception("Timed out") + conductor_to_test_test.send(msg) + + if len(job_ids) == 46: + break + + if isinstance(msg, str): + job_ids.append(msg.replace(TEST_JOB_QUEUE+os.path.sep, '')) - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed execution for job: '" in msg: - job_id = msg.replace( - "INFO: Completed execution for job: '", "") - job_id = job_id[:-2] - if job_id not in job_ids: - job_ids.append(job_id) - loops = 0 loops += 1 runner.stop() @@ -802,8 +950,6 @@ class MeowTests(unittest.TestCase): recipe_one.name: recipe_one, } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -814,13 +960,23 @@ class MeowTests(unittest.TestCase): PythonHandler( job_queue_dir=TEST_JOB_QUEUE ), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 - ) - + job_output_dir=TEST_JOB_OUTPUT + ) + + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) + runner.start() start_dir = os.path.join(TEST_MONITOR_BASE, "start") @@ -833,19 +989,28 @@ class MeowTests(unittest.TestCase): loops = 0 job_ids = set() - while loops < 5: - sleep(1) - runner_debug_stream.seek(0) - messages = runner_debug_stream.readlines() + while loops < 15: + # Initial prompt + if conductor_to_test_test.poll(5): + msg = conductor_to_test_test.recv() + else: + raise Exception("Timed out") + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) + + # Reply + if test_to_runner_test.poll(5): + msg = test_to_runner_test.recv() + else: + raise Exception("Timed out") + conductor_to_test_test.send(msg) + + if len(job_ids) == 2: + break + + if isinstance(msg, str): + job_ids.add(msg.replace(TEST_JOB_QUEUE+os.path.sep, '')) - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed execution for job: '" in msg: - job_id = msg.replace( - "INFO: Completed execution for job: '", "") - job_ids.add(job_id[:-2]) - loops = 5 loops += 1 self.assertEqual(len(job_ids), 1) @@ -856,19 +1021,28 @@ class MeowTests(unittest.TestCase): runner.monitors[0].add_pattern(pattern_two) loops = 0 - while loops < 5: - sleep(1) - runner_debug_stream.seek(0) - messages = runner_debug_stream.readlines() + while loops < 15: + # Initial prompt + if conductor_to_test_test.poll(5): + msg = conductor_to_test_test.recv() + else: + raise Exception("Timed out") + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) + + # Reply + if test_to_runner_test.poll(5): + msg = test_to_runner_test.recv() + else: + raise Exception("Timed out") + conductor_to_test_test.send(msg) + + if len(job_ids) == 2: + break + + if isinstance(msg, str): + job_ids.add(msg.replace(TEST_JOB_QUEUE+os.path.sep, '')) - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed execution for job: '" in msg: - job_id = msg.replace( - "INFO: Completed execution for job: '", "") - job_ids.add(job_id[:-2]) - loops = 5 loops += 1 self.assertEqual(len(job_ids), 1) @@ -879,19 +1053,28 @@ class MeowTests(unittest.TestCase): runner.monitors[0].add_recipe(recipe_two) loops = 0 - while loops < 5: - sleep(1) - runner_debug_stream.seek(0) - messages = runner_debug_stream.readlines() + while loops < 15: + # Initial prompt + if conductor_to_test_test.poll(5): + msg = conductor_to_test_test.recv() + else: + raise Exception("Timed out") + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) + + # Reply + if test_to_runner_test.poll(5): + msg = test_to_runner_test.recv() + else: + raise Exception("Timed out") + conductor_to_test_test.send(msg) + + if len(job_ids) == 2: + break + + if isinstance(msg, str): + job_ids.add(msg.replace(TEST_JOB_QUEUE+os.path.sep, '')) - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed execution for job: '" in msg: - job_id = msg.replace( - "INFO: Completed execution for job: '", "") - job_ids.add(job_id[:-2]) - loops = 5 loops += 1 self.assertEqual(len(job_ids), 2) @@ -949,8 +1132,6 @@ class MeowTests(unittest.TestCase): maker_recipe.name: maker_recipe } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -961,21 +1142,34 @@ class MeowTests(unittest.TestCase): PythonHandler( job_queue_dir=TEST_JOB_QUEUE ), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 - ) + job_output_dir=TEST_JOB_OUTPUT + ) + + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) # TODO finish me # runner.start() # Test some actual scientific analysis, but in a simple progression def testScientificAnalysisAllGood(self)->None: - if os.environ["SKIP_LONG"] and os.environ["SKIP_LONG"] == '1': - warn("Skipping testScientificAnalysisAllGood") - return + try: + if os.environ["SKIP_LONG"] and os.environ["SKIP_LONG"] == '1': + warn("Skipping testScientificAnalysisAllGood") + return + except KeyError: + pass patterns = { 'pattern_check': pattern_check, @@ -991,8 +1185,6 @@ class MeowTests(unittest.TestCase): 'recipe_generator': recipe_generator } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -1003,13 +1195,23 @@ class MeowTests(unittest.TestCase): PapermillHandler( job_queue_dir=TEST_JOB_QUEUE ), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 + job_output_dir=TEST_JOB_OUTPUT ) + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) + good = 3 big = 0 small = 0 @@ -1047,48 +1249,35 @@ class MeowTests(unittest.TestCase): runner.start() - idle_loops = 0 - total_loops = 0 - messages = None - while idle_loops < 15 and total_loops < 150: - sleep(1) - runner_debug_stream.seek(0) - new_messages = runner_debug_stream.readlines() - - if messages == new_messages: - idle_loops += 1 + loops = 0 + idles = 0 + while loops < 150 and idles < 15: + # Initial prompt + if conductor_to_test_test.poll(45): + msg = conductor_to_test_test.recv() else: - idle_loops = 0 - messages = new_messages - total_loops += 1 + break + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) - for message in messages: - print(message.replace('\n', '')) + # Reply + if test_to_runner_test.poll(5): + msg = test_to_runner_test.recv() + if msg == 1: + idles += 1 + else: + break + conductor_to_test_test.send(msg) + + loops += 1 runner.stop() - print(f"total_loops:{total_loops}, idle_loops:{idle_loops}") - - if len(os.listdir(TEST_JOB_OUTPUT)) != good * 3: - backup_before_teardown(TEST_JOB_OUTPUT, - f"Backup-all_good-{TEST_JOB_OUTPUT}") - backup_before_teardown(TEST_JOB_QUEUE, - f"Backup-all_good-{TEST_JOB_QUEUE}") - backup_before_teardown(TEST_MONITOR_BASE, - f"Backup-all_good-{TEST_MONITOR_BASE}") self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), good * 3) for job_dir in os.listdir(TEST_JOB_OUTPUT): metafile = os.path.join(TEST_JOB_OUTPUT, job_dir, META_FILE) status = read_yaml(metafile) - if JOB_ERROR in status: - backup_before_teardown(TEST_JOB_OUTPUT, - f"Backup-all_good-{TEST_JOB_OUTPUT}") - backup_before_teardown(TEST_JOB_QUEUE, - f"Backup-all_good-{TEST_JOB_QUEUE}") - backup_before_teardown(TEST_MONITOR_BASE, - f"Backup-all_good-{TEST_MONITOR_BASE}") - self.assertNotIn(JOB_ERROR, status) result_path = os.path.join( @@ -1097,9 +1286,13 @@ class MeowTests(unittest.TestCase): # Test some actual scientific analysis, in a predicatable loop def testScientificAnalysisPredictableLoop(self)->None: - if os.environ["SKIP_LONG"] and os.environ["SKIP_LONG"] == '1': - warn("Skipping testScientificAnalysisPredictableLoop") - return + try: + if os.environ["SKIP_LONG"] and os.environ["SKIP_LONG"] == '1': + warn("Skipping testScientificAnalysisAllGood") + return + except KeyError: + pass + patterns = { 'pattern_check': pattern_check, @@ -1115,8 +1308,6 @@ class MeowTests(unittest.TestCase): 'recipe_generator': recipe_generator } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -1127,13 +1318,23 @@ class MeowTests(unittest.TestCase): PapermillHandler( job_queue_dir=TEST_JOB_QUEUE ), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 + job_output_dir=TEST_JOB_OUTPUT ) + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) + good = 10 big = 5 small = 0 @@ -1172,26 +1373,32 @@ class MeowTests(unittest.TestCase): runner.start() - idle_loops = 0 - total_loops = 0 - messages = None - while idle_loops < 45 and total_loops < 600: - sleep(1) - runner_debug_stream.seek(0) - new_messages = runner_debug_stream.readlines() - - if messages == new_messages: - idle_loops += 1 + loops = 0 + idles = 0 + while loops < 600 and idles < 15: + # Initial prompt + if conductor_to_test_test.poll(45): + msg = conductor_to_test_test.recv() else: - idle_loops = 0 - messages = new_messages - total_loops += 1 + break + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) - for message in messages: - print(message.replace('\n', '')) + # Reply + if test_to_runner_test.poll(15): + msg = test_to_runner_test.recv() + if msg == 1: + idles += 1 + else: + idles = 0 + else: + break + conductor_to_test_test.send(msg) + + loops += 1 runner.stop() - print(f"total_loops:{total_loops}, idle_loops:{idle_loops}") + print(f"total_loops:{loops}, idle_loops:{idles}") jobs = len(os.listdir(TEST_JOB_OUTPUT)) if jobs != (good*3 + big*5 + small*5): @@ -1232,13 +1439,16 @@ class MeowTests(unittest.TestCase): backup_before_teardown(TEST_MONITOR_BASE, f"Backup-predictable-{TEST_MONITOR_BASE}") - self.assertEqual(results, good+big+small) + self.assertEqual(results, good+big+small) # Test some actual scientific analysis, in an unpredicatable loop def testScientificAnalysisRandomLoop(self)->None: - if os.environ["SKIP_LONG"] and os.environ["SKIP_LONG"] == '1': - warn("Skipping testScientificAnalysisRandomLoop") - return + try: + if os.environ["SKIP_LONG"] and os.environ["SKIP_LONG"] == '1': + warn("Skipping testScientificAnalysisAllGood") + return + except KeyError: + pass pattern_regenerate_random = FileEventPattern( "pattern_regenerate_random", @@ -1273,8 +1483,6 @@ class MeowTests(unittest.TestCase): 'recipe_generator': recipe_generator } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -1285,13 +1493,23 @@ class MeowTests(unittest.TestCase): PapermillHandler( job_queue_dir=TEST_JOB_QUEUE ), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 + job_output_dir=TEST_JOB_OUTPUT ) + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) + good = 10 big = 5 small = 0 @@ -1330,26 +1548,32 @@ class MeowTests(unittest.TestCase): runner.start() - idle_loops = 0 - total_loops = 0 - messages = None - while idle_loops < 60 and total_loops < 600: - sleep(1) - runner_debug_stream.seek(0) - new_messages = runner_debug_stream.readlines() - - if messages == new_messages: - idle_loops += 1 + loops = 0 + idles = 0 + while loops < 600 and idles < 15: + # Initial prompt + if conductor_to_test_test.poll(60): + msg = conductor_to_test_test.recv() else: - idle_loops = 0 - messages = new_messages - total_loops += 1 + break + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) - for message in messages: - print(message.replace('\n', '')) + # Reply + if test_to_runner_test.poll(15): + msg = test_to_runner_test.recv() + if msg == 1: + idles += 1 + else: + idles = 0 + else: + break + conductor_to_test_test.send(msg) + + loops += 1 runner.stop() - print(f"total_loops:{total_loops}, idle_loops:{idle_loops}") + print(f"total_loops:{loops}, idle_loops:{idles}") for job_dir in os.listdir(TEST_JOB_OUTPUT): metafile = os.path.join(TEST_JOB_OUTPUT, job_dir, META_FILE) @@ -1388,9 +1612,13 @@ class MeowTests(unittest.TestCase): # Test some actual scientific analysis, in an unpredicatable loop def testScientificAnalysisMassiveRandomLoop(self)->None: - if os.environ["SKIP_LONG"] and os.environ["SKIP_LONG"] == '1': - warn("Skipping testScientificAnalysisMassiveRandomLoop") - return + try: + if os.environ["SKIP_LONG"] and os.environ["SKIP_LONG"] == '1': + warn("Skipping testScientificAnalysisAllGood") + return + except KeyError: + pass + pattern_regenerate_random = FileEventPattern( "pattern_regenerate_random", @@ -1425,8 +1653,6 @@ class MeowTests(unittest.TestCase): 'recipe_generator': recipe_generator } - runner_debug_stream = io.StringIO("") - runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, @@ -1437,13 +1663,23 @@ class MeowTests(unittest.TestCase): PapermillHandler( job_queue_dir=TEST_JOB_QUEUE ), - LocalPythonConductor(), + LocalPythonConductor(pause_time=2), job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT, - print=runner_debug_stream, - logging=3 + job_output_dir=TEST_JOB_OUTPUT ) + # Intercept messages between the conductor and runner for testing + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) + test_to_runner_runner, test_to_runner_test = Pipe(duplex=True) + + runner.conductors[0].to_runner_job = conductor_to_test_conductor + + for i in range(len(runner.job_connections)): + _, obj = runner.job_connections[i] + + if obj == runner.conductors[0]: + runner.job_connections[i] = (test_to_runner_runner, runner.job_connections[i][1]) + good = 5 big = 15 small = 0 @@ -1482,26 +1718,32 @@ class MeowTests(unittest.TestCase): runner.start() - idle_loops = 0 - total_loops = 0 - messages = None - while idle_loops < 60 and total_loops < 1200: - sleep(1) - runner_debug_stream.seek(0) - new_messages = runner_debug_stream.readlines() - - if messages == new_messages: - idle_loops += 1 + loops = 0 + idles = 0 + while loops < 1200 and idles < 15: + # Initial prompt + if conductor_to_test_test.poll(60): + msg = conductor_to_test_test.recv() else: - idle_loops = 0 - messages = new_messages - total_loops += 1 + break + self.assertEqual(msg, 1) + test_to_runner_test.send(msg) - for message in messages: - print(message.replace('\n', '')) + # Reply + if test_to_runner_test.poll(15): + msg = test_to_runner_test.recv() + if msg == 1: + idles += 1 + else: + idles = 0 + else: + break + conductor_to_test_test.send(msg) + + loops += 1 runner.stop() - print(f"total_loops:{total_loops}, idle_loops:{idle_loops}") + print(f"total_loops:{loops}, idle_loops:{idles}") for job_dir in os.listdir(TEST_JOB_OUTPUT): metafile = os.path.join(TEST_JOB_OUTPUT, job_dir, META_FILE) diff --git a/tests/test_validation.py b/tests/test_validation.py index eba5e38..66a3528 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -10,7 +10,7 @@ from meow_base.core.meow import valid_event, valid_job, \ from meow_base.functionality.validation import check_type, \ check_implementation, valid_string, valid_dict, valid_list, \ valid_existing_file_path, valid_dir_path, valid_non_existing_path, \ - check_callable + check_callable, valid_natural, valid_dict_multiple_types from meow_base.core.vars import VALID_NAME_CHARS, SHA256, \ EVENT_TYPE, EVENT_PATH, JOB_TYPE, JOB_EVENT, JOB_ID, JOB_PATTERN, \ JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, EVENT_RULE, \ @@ -127,6 +127,36 @@ class ValidationTests(unittest.TestCase): with self.assertRaises(ValueError): valid_dict({"a": 0, "b": 1}, str, int, strict=True) + def testValidDictMultipleTypes(self)->None: + valid_dict_multiple_types( + {"a": 0, "b": 1}, + str, + [int], + strict=False + ) + + valid_dict_multiple_types( + {"a": 0, "b": 1}, + str, + [int, str], + strict=False + ) + + valid_dict_multiple_types( + {"a": 0, "b": 'a'}, + str, + [int, str], + strict=False + ) + + with self.assertRaises(TypeError): + valid_dict_multiple_types( + {"a": 0, "b": 'a'}, + str, + [int], + strict=False + ) + # Test valid_list with sufficent lengths def testValidListMinimum(self)->None: valid_list([1, 2, 3], int) @@ -255,6 +285,18 @@ class ValidationTests(unittest.TestCase): with self.assertRaises(TypeError): check_callable("a") + # Test natural number check + def testValidNatural(self)->None: + valid_natural(0) + valid_natural(1) + + with self.assertRaises(ValueError): + valid_natural(-1) + + with self.assertRaises(TypeError): + valid_natural(1.0) + + class MeowTests(unittest.TestCase): def setUp(self)->None: super().setUp()