diff --git a/core/base_conductor.py b/core/base_conductor.py index d6a4006..5df2e42 100644 --- a/core/base_conductor.py +++ b/core/base_conductor.py @@ -8,7 +8,7 @@ Author(s): David Marchant from typing import Any, Tuple, Dict -from meow_base.core.vars import VALID_CONDUCTOR_NAME_CHARS, \ +from meow_base.core.vars import VALID_CONDUCTOR_NAME_CHARS, VALID_CHANNELS, \ get_drt_imp_msg from meow_base.functionality.validation import check_implementation, \ valid_string @@ -19,6 +19,10 @@ class BaseConductor: # An identifier for a conductor within the runner. Can be manually set in # the constructor, or autogenerated if no name provided. name:str + # A channel for sending messages to the runner. Note that this will be + # overridden by a MeowRunner, if a conductor instance is passed to it, and + # so does not need to be initialised within the conductor itself. + to_runner: VALID_CHANNELS # Directory where queued jobs are initially written to. Note that this # will be overridden by a MeowRunner, if a handler instance is passed to # it, and so does not need to be initialised within the handler itself. @@ -32,6 +36,7 @@ class BaseConductor: from it implements its validation functions.""" check_implementation(type(self).execute, BaseConductor) check_implementation(type(self).valid_execute_criteria, BaseConductor) + check_implementation(type(self).prompt_runner_for_job, BaseConductor) if not name: name = generate_conductor_id() self._is_valid_name(name) @@ -51,6 +56,21 @@ class BaseConductor: overridden by child classes.""" valid_string(name, VALID_CONDUCTOR_NAME_CHARS) + def prompt_runner_for_job(self): + pass + + def start(self)->None: + """Function to start the conductor as an ongoing process/thread. May be + overidden by any child process. Note that by default this will raise an + execption that is automatically handled within a runner instance.""" + raise NotImplementedError + + def stop(self)->None: + """Function to stop the conductor as an ongoing process/thread. May be + overidden by any child process. Note that by default this will raise an + execption that is automatically handled within a runner instance.""" + raise NotImplementedError + def valid_execute_criteria(self, job:Dict[str,Any])->Tuple[bool,str]: """Function to determine given an job defintion, if this conductor can process it or not. Must be implemented by any child process.""" diff --git a/core/base_handler.py b/core/base_handler.py index 2c628bd..3b0873a 100644 --- a/core/base_handler.py +++ b/core/base_handler.py @@ -31,6 +31,8 @@ class BaseHandler: from it implements its validation functions.""" check_implementation(type(self).handle, BaseHandler) check_implementation(type(self).valid_handle_criteria, BaseHandler) + check_implementation(type(self).prompt_runner_for_event, BaseHandler) + check_implementation(type(self).send_job_to_runner, BaseHandler) if not name: name = generate_handler_id() self._is_valid_name(name) @@ -50,6 +52,25 @@ class BaseHandler: overridden by child classes.""" valid_string(name, VALID_HANDLER_NAME_CHARS) + def prompt_runner_for_event(self): + pass + + def send_job_to_runner(self, msg): + #self.to_runner.send(msg) + pass + + def start(self)->None: + """Function to start the handler as an ongoing process/thread. May be + overidden by any child process. Note that by default this will raise an + execption that is automatically handled within a runner instance.""" + raise NotImplementedError + + def stop(self)->None: + """Function to stop the handler as an ongoing process/thread. May be + overidden by any child process. Note that by default this will raise an + execption that is automatically handled within a runner instance.""" + raise NotImplementedError + def valid_handle_criteria(self, event:Dict[str,Any])->Tuple[bool,str]: """Function to determine given an event defintion, if this handler can process it or not. Must be implemented by any child process.""" diff --git a/core/base_monitor.py b/core/base_monitor.py index 169e28a..1d70ecb 100644 --- a/core/base_monitor.py +++ b/core/base_monitor.py @@ -54,6 +54,7 @@ class BaseMonitor: check_implementation(type(self).remove_recipe, BaseMonitor) check_implementation(type(self).get_recipes, BaseMonitor) check_implementation(type(self).get_rules, BaseMonitor) + check_implementation(type(self).send_event_to_runner, BaseMonitor) # Ensure that patterns and recipes cannot be trivially modified from # outside the monitor, as this will cause internal consistency issues self._patterns = deepcopy(patterns) @@ -88,6 +89,9 @@ class BaseMonitor: be implemented by any child class.""" pass + def send_event_to_runner(self, msg): + self.to_runner.send(msg) + def start(self)->None: """Function to start the monitor as an ongoing process/thread. Must be implemented by any child process""" diff --git a/core/runner.py b/core/runner.py index bdec074..04b120e 100644 --- a/core/runner.py +++ b/core/runner.py @@ -21,7 +21,7 @@ from meow_base.core.vars import DEBUG_WARNING, DEBUG_INFO, \ EVENT_TYPE, VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, \ DEFAULT_JOB_QUEUE_DIR, EVENT_PATH from meow_base.functionality.validation import check_type, valid_list, \ - valid_dir_path + valid_dir_path, check_implementation from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.file_io import make_dir, threadsafe_read_status from meow_base.functionality.process_io import wait @@ -276,12 +276,20 @@ class MeowRunner: startable = [] # Start all handlers, if they need it for handler in self.handlers: - if hasattr(handler, "start") and handler not in startable: - startable.append() + try: + check_implementation(handler.start, BaseHandler) + if handler not in startable: + startable.append(handler) + except NotImplementedError: + pass # Start all conductors, if they need it for conductor in self.conductors: - if hasattr(conductor, "start") and conductor not in startable: - startable.append() + try: + check_implementation(conductor.start, BaseConductor) + if conductor not in startable: + startable.append(conductor) + except NotImplementedError: + pass for starting in startable: starting.start() @@ -330,12 +338,20 @@ class MeowRunner: stopable = [] # Stop all handlers, if they need it for handler in self.handlers: - if hasattr(handler, "stop") and handler not in stopable: - stopable.append() + try: + check_implementation(handler.stop, BaseHandler) + if handler not in stopable: + stopable.append(handler) + except NotImplementedError: + pass # Stop all conductors, if they need it for conductor in self.conductors: - if hasattr(conductor, "stop") and conductor not in stopable: - stopable.append() + try: + check_implementation(conductor.stop, BaseConductor) + if conductor not in stopable: + stopable.append(conductor) + except NotImplementedError: + pass for stopping in stopable: stopping.stop() diff --git a/patterns/file_event_pattern.py b/patterns/file_event_pattern.py index 2244980..b555920 100644 --- a/patterns/file_event_pattern.py +++ b/patterns/file_event_pattern.py @@ -256,7 +256,7 @@ class WatchdogMonitor(BaseMonitor): f"Event at {src_path} hit rule {rule.name}", DEBUG_INFO) # Send the event to the runner - self.to_runner.send(meow_event) + self.send_to_runner(meow_event) except Exception as e: self._rules_lock.release() @@ -559,7 +559,7 @@ class WatchdogMonitor(BaseMonitor): f"Retroactive event for file at at {globble} hit rule " f"{rule.name}", DEBUG_INFO) # Send it to the runner - self.to_runner.send(meow_event) + self.send_to_runner(meow_event) except Exception as e: self._rules_lock.release() diff --git a/recipes/bash_recipe.py b/recipes/bash_recipe.py index 27c7ea1..9086e73 100644 --- a/recipes/bash_recipe.py +++ b/recipes/bash_recipe.py @@ -180,7 +180,7 @@ class BashHandler(BaseHandler): threadsafe_write_status(meow_job, meta_file) # Send job directory, as actual definitons will be read from within it - self.to_runner.send(job_dir) + self.send_to_runner(job_dir) def assemble_bash_job_script()->List[str]: diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index 3e98a56..cf4e368 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -185,7 +185,7 @@ class PapermillHandler(BaseHandler): ) # Send job directory, as actual definitons will be read from within it - self.to_runner.send(job_dir) + self.send_to_runner(job_dir) def get_recipe_from_notebook(name:str, notebook_filename:str, parameters:Dict[str,Any]={}, requirements:Dict[str,Any]={} diff --git a/recipes/python_recipe.py b/recipes/python_recipe.py index 03932a5..e4c3269 100644 --- a/recipes/python_recipe.py +++ b/recipes/python_recipe.py @@ -173,7 +173,7 @@ class PythonHandler(BaseHandler): ) # Send job directory, as actual definitons will be read from within it - self.to_runner.send(job_dir) + self.send_to_runner(job_dir) # Papermill job execution code, to be run within the conductor diff --git a/tests/test_runner.py b/tests/test_runner.py index 2148a4c..2fe2857 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -152,7 +152,7 @@ class MeowTests(unittest.TestCase): self.assertIsInstance(runner.from_monitors, list) self.assertEqual(len(runner.from_monitors), 1) - runner.monitors[0].to_runner.send("monitor test message") + runner.monitors[0].send_to_runner("monitor test message") message = None if runner.from_monitors[0].poll(3): message = runner.from_monitors[0].recv() @@ -165,8 +165,7 @@ class MeowTests(unittest.TestCase): self.assertIsInstance(runner.from_handlers, list) self.assertEqual(len(runner.from_handlers), 1) - runner.handlers[0].to_runner.send( - "handler test message") + runner.handlers[0].send_to_runner("handler test message") message = None if runner.from_handlers[0].poll(3): message = runner.from_handlers[0].recv() @@ -189,7 +188,7 @@ class MeowTests(unittest.TestCase): self.assertIsInstance(runner.from_monitors, list) self.assertEqual(len(runner.from_monitors), len(monitors)) for rm in runner.monitors: - rm.to_runner.send("monitor test message") + rm.send_to_runner("monitor test message") messages = [None] * len(monitors) for i, rfm in enumerate(runner.from_monitors): if rfm.poll(3): @@ -205,7 +204,7 @@ class MeowTests(unittest.TestCase): self.assertIsInstance(runner.from_handlers, list) self.assertEqual(len(runner.from_handlers), len(handlers)) for rh in runner.handlers: - rh.to_runner.send("handler test message") + rh.send_to_runner("handler test message") message = None if runner.from_handlers[0].poll(3): message = runner.from_handlers[0].recv()