setup for rework of monitor/handler/conductor interactions

This commit is contained in:
PatchOfScotland
2023-04-18 16:24:08 +02:00
parent ddca1f6aa4
commit b87fd43cfd
9 changed files with 80 additions and 20 deletions

View File

@ -8,7 +8,7 @@ Author(s): David Marchant
from typing import Any, Tuple, Dict from typing import Any, Tuple, Dict
from meow_base.core.vars import VALID_CONDUCTOR_NAME_CHARS, \ from meow_base.core.vars import VALID_CONDUCTOR_NAME_CHARS, VALID_CHANNELS, \
get_drt_imp_msg get_drt_imp_msg
from meow_base.functionality.validation import check_implementation, \ from meow_base.functionality.validation import check_implementation, \
valid_string valid_string
@ -19,6 +19,10 @@ class BaseConductor:
# An identifier for a conductor within the runner. Can be manually set in # An identifier for a conductor within the runner. Can be manually set in
# the constructor, or autogenerated if no name provided. # the constructor, or autogenerated if no name provided.
name:str name:str
# A channel for sending messages to the runner. Note that this will be
# overridden by a MeowRunner, if a conductor instance is passed to it, and
# so does not need to be initialised within the conductor itself.
to_runner: VALID_CHANNELS
# Directory where queued jobs are initially written to. Note that this # Directory where queued jobs are initially written to. Note that this
# will be overridden by a MeowRunner, if a handler instance is passed to # will be overridden by a MeowRunner, if a handler instance is passed to
# it, and so does not need to be initialised within the handler itself. # it, and so does not need to be initialised within the handler itself.
@ -32,6 +36,7 @@ class BaseConductor:
from it implements its validation functions.""" from it implements its validation functions."""
check_implementation(type(self).execute, BaseConductor) check_implementation(type(self).execute, BaseConductor)
check_implementation(type(self).valid_execute_criteria, BaseConductor) check_implementation(type(self).valid_execute_criteria, BaseConductor)
check_implementation(type(self).prompt_runner_for_job, BaseConductor)
if not name: if not name:
name = generate_conductor_id() name = generate_conductor_id()
self._is_valid_name(name) self._is_valid_name(name)
@ -51,6 +56,21 @@ class BaseConductor:
overridden by child classes.""" overridden by child classes."""
valid_string(name, VALID_CONDUCTOR_NAME_CHARS) valid_string(name, VALID_CONDUCTOR_NAME_CHARS)
def prompt_runner_for_job(self):
pass
def start(self)->None:
"""Function to start the conductor as an ongoing process/thread. May be
overidden by any child process. Note that by default this will raise an
execption that is automatically handled within a runner instance."""
raise NotImplementedError
def stop(self)->None:
"""Function to stop the conductor as an ongoing process/thread. May be
overidden by any child process. Note that by default this will raise an
execption that is automatically handled within a runner instance."""
raise NotImplementedError
def valid_execute_criteria(self, job:Dict[str,Any])->Tuple[bool,str]: def valid_execute_criteria(self, job:Dict[str,Any])->Tuple[bool,str]:
"""Function to determine given an job defintion, if this conductor can """Function to determine given an job defintion, if this conductor can
process it or not. Must be implemented by any child process.""" process it or not. Must be implemented by any child process."""

View File

@ -31,6 +31,8 @@ class BaseHandler:
from it implements its validation functions.""" from it implements its validation functions."""
check_implementation(type(self).handle, BaseHandler) check_implementation(type(self).handle, BaseHandler)
check_implementation(type(self).valid_handle_criteria, BaseHandler) check_implementation(type(self).valid_handle_criteria, BaseHandler)
check_implementation(type(self).prompt_runner_for_event, BaseHandler)
check_implementation(type(self).send_job_to_runner, BaseHandler)
if not name: if not name:
name = generate_handler_id() name = generate_handler_id()
self._is_valid_name(name) self._is_valid_name(name)
@ -50,6 +52,25 @@ class BaseHandler:
overridden by child classes.""" overridden by child classes."""
valid_string(name, VALID_HANDLER_NAME_CHARS) valid_string(name, VALID_HANDLER_NAME_CHARS)
def prompt_runner_for_event(self):
pass
def send_job_to_runner(self, msg):
#self.to_runner.send(msg)
pass
def start(self)->None:
"""Function to start the handler as an ongoing process/thread. May be
overidden by any child process. Note that by default this will raise an
execption that is automatically handled within a runner instance."""
raise NotImplementedError
def stop(self)->None:
"""Function to stop the handler as an ongoing process/thread. May be
overidden by any child process. Note that by default this will raise an
execption that is automatically handled within a runner instance."""
raise NotImplementedError
def valid_handle_criteria(self, event:Dict[str,Any])->Tuple[bool,str]: def valid_handle_criteria(self, event:Dict[str,Any])->Tuple[bool,str]:
"""Function to determine given an event defintion, if this handler can """Function to determine given an event defintion, if this handler can
process it or not. Must be implemented by any child process.""" process it or not. Must be implemented by any child process."""

View File

@ -54,6 +54,7 @@ class BaseMonitor:
check_implementation(type(self).remove_recipe, BaseMonitor) check_implementation(type(self).remove_recipe, BaseMonitor)
check_implementation(type(self).get_recipes, BaseMonitor) check_implementation(type(self).get_recipes, BaseMonitor)
check_implementation(type(self).get_rules, BaseMonitor) check_implementation(type(self).get_rules, BaseMonitor)
check_implementation(type(self).send_event_to_runner, BaseMonitor)
# Ensure that patterns and recipes cannot be trivially modified from # Ensure that patterns and recipes cannot be trivially modified from
# outside the monitor, as this will cause internal consistency issues # outside the monitor, as this will cause internal consistency issues
self._patterns = deepcopy(patterns) self._patterns = deepcopy(patterns)
@ -88,6 +89,9 @@ class BaseMonitor:
be implemented by any child class.""" be implemented by any child class."""
pass pass
def send_event_to_runner(self, msg):
self.to_runner.send(msg)
def start(self)->None: def start(self)->None:
"""Function to start the monitor as an ongoing process/thread. Must be """Function to start the monitor as an ongoing process/thread. Must be
implemented by any child process""" implemented by any child process"""

View File

@ -21,7 +21,7 @@ from meow_base.core.vars import DEBUG_WARNING, DEBUG_INFO, \
EVENT_TYPE, VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, \ EVENT_TYPE, VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, \
DEFAULT_JOB_QUEUE_DIR, EVENT_PATH DEFAULT_JOB_QUEUE_DIR, EVENT_PATH
from meow_base.functionality.validation import check_type, valid_list, \ from meow_base.functionality.validation import check_type, valid_list, \
valid_dir_path valid_dir_path, check_implementation
from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.debug import setup_debugging, print_debug
from meow_base.functionality.file_io import make_dir, threadsafe_read_status from meow_base.functionality.file_io import make_dir, threadsafe_read_status
from meow_base.functionality.process_io import wait from meow_base.functionality.process_io import wait
@ -276,12 +276,20 @@ class MeowRunner:
startable = [] startable = []
# Start all handlers, if they need it # Start all handlers, if they need it
for handler in self.handlers: for handler in self.handlers:
if hasattr(handler, "start") and handler not in startable: try:
startable.append() check_implementation(handler.start, BaseHandler)
if handler not in startable:
startable.append(handler)
except NotImplementedError:
pass
# Start all conductors, if they need it # Start all conductors, if they need it
for conductor in self.conductors: for conductor in self.conductors:
if hasattr(conductor, "start") and conductor not in startable: try:
startable.append() check_implementation(conductor.start, BaseConductor)
if conductor not in startable:
startable.append(conductor)
except NotImplementedError:
pass
for starting in startable: for starting in startable:
starting.start() starting.start()
@ -330,12 +338,20 @@ class MeowRunner:
stopable = [] stopable = []
# Stop all handlers, if they need it # Stop all handlers, if they need it
for handler in self.handlers: for handler in self.handlers:
if hasattr(handler, "stop") and handler not in stopable: try:
stopable.append() check_implementation(handler.stop, BaseHandler)
if handler not in stopable:
stopable.append(handler)
except NotImplementedError:
pass
# Stop all conductors, if they need it # Stop all conductors, if they need it
for conductor in self.conductors: for conductor in self.conductors:
if hasattr(conductor, "stop") and conductor not in stopable: try:
stopable.append() check_implementation(conductor.stop, BaseConductor)
if conductor not in stopable:
stopable.append(conductor)
except NotImplementedError:
pass
for stopping in stopable: for stopping in stopable:
stopping.stop() stopping.stop()

View File

@ -256,7 +256,7 @@ class WatchdogMonitor(BaseMonitor):
f"Event at {src_path} hit rule {rule.name}", f"Event at {src_path} hit rule {rule.name}",
DEBUG_INFO) DEBUG_INFO)
# Send the event to the runner # Send the event to the runner
self.to_runner.send(meow_event) self.send_to_runner(meow_event)
except Exception as e: except Exception as e:
self._rules_lock.release() self._rules_lock.release()
@ -559,7 +559,7 @@ class WatchdogMonitor(BaseMonitor):
f"Retroactive event for file at at {globble} hit rule " f"Retroactive event for file at at {globble} hit rule "
f"{rule.name}", DEBUG_INFO) f"{rule.name}", DEBUG_INFO)
# Send it to the runner # Send it to the runner
self.to_runner.send(meow_event) self.send_to_runner(meow_event)
except Exception as e: except Exception as e:
self._rules_lock.release() self._rules_lock.release()

View File

@ -180,7 +180,7 @@ class BashHandler(BaseHandler):
threadsafe_write_status(meow_job, meta_file) threadsafe_write_status(meow_job, meta_file)
# Send job directory, as actual definitons will be read from within it # Send job directory, as actual definitons will be read from within it
self.to_runner.send(job_dir) self.send_to_runner(job_dir)
def assemble_bash_job_script()->List[str]: def assemble_bash_job_script()->List[str]:

View File

@ -185,7 +185,7 @@ class PapermillHandler(BaseHandler):
) )
# Send job directory, as actual definitons will be read from within it # Send job directory, as actual definitons will be read from within it
self.to_runner.send(job_dir) self.send_to_runner(job_dir)
def get_recipe_from_notebook(name:str, notebook_filename:str, def get_recipe_from_notebook(name:str, notebook_filename:str,
parameters:Dict[str,Any]={}, requirements:Dict[str,Any]={} parameters:Dict[str,Any]={}, requirements:Dict[str,Any]={}

View File

@ -173,7 +173,7 @@ class PythonHandler(BaseHandler):
) )
# Send job directory, as actual definitons will be read from within it # Send job directory, as actual definitons will be read from within it
self.to_runner.send(job_dir) self.send_to_runner(job_dir)
# Papermill job execution code, to be run within the conductor # Papermill job execution code, to be run within the conductor

View File

@ -152,7 +152,7 @@ class MeowTests(unittest.TestCase):
self.assertIsInstance(runner.from_monitors, list) self.assertIsInstance(runner.from_monitors, list)
self.assertEqual(len(runner.from_monitors), 1) self.assertEqual(len(runner.from_monitors), 1)
runner.monitors[0].to_runner.send("monitor test message") runner.monitors[0].send_to_runner("monitor test message")
message = None message = None
if runner.from_monitors[0].poll(3): if runner.from_monitors[0].poll(3):
message = runner.from_monitors[0].recv() message = runner.from_monitors[0].recv()
@ -165,8 +165,7 @@ class MeowTests(unittest.TestCase):
self.assertIsInstance(runner.from_handlers, list) self.assertIsInstance(runner.from_handlers, list)
self.assertEqual(len(runner.from_handlers), 1) self.assertEqual(len(runner.from_handlers), 1)
runner.handlers[0].to_runner.send( runner.handlers[0].send_to_runner("handler test message")
"handler test message")
message = None message = None
if runner.from_handlers[0].poll(3): if runner.from_handlers[0].poll(3):
message = runner.from_handlers[0].recv() message = runner.from_handlers[0].recv()
@ -189,7 +188,7 @@ class MeowTests(unittest.TestCase):
self.assertIsInstance(runner.from_monitors, list) self.assertIsInstance(runner.from_monitors, list)
self.assertEqual(len(runner.from_monitors), len(monitors)) self.assertEqual(len(runner.from_monitors), len(monitors))
for rm in runner.monitors: for rm in runner.monitors:
rm.to_runner.send("monitor test message") rm.send_to_runner("monitor test message")
messages = [None] * len(monitors) messages = [None] * len(monitors)
for i, rfm in enumerate(runner.from_monitors): for i, rfm in enumerate(runner.from_monitors):
if rfm.poll(3): if rfm.poll(3):
@ -205,7 +204,7 @@ class MeowTests(unittest.TestCase):
self.assertIsInstance(runner.from_handlers, list) self.assertIsInstance(runner.from_handlers, list)
self.assertEqual(len(runner.from_handlers), len(handlers)) self.assertEqual(len(runner.from_handlers), len(handlers))
for rh in runner.handlers: for rh in runner.handlers:
rh.to_runner.send("handler test message") rh.send_to_runner("handler test message")
message = None message = None
if runner.from_handlers[0].poll(3): if runner.from_handlers[0].poll(3):
message = runner.from_handlers[0].recv() message = runner.from_handlers[0].recv()