""" This file contains the defintion for the MeowRunner, the main construct used for actually orchestration MEOW analysis. It is intended as a modular system, with monitors, handlers, and conductors being swappable at initialisation. Author(s): David Marchant """ import os import sys import threading from multiprocessing import Pipe from random import randrange from typing import Any, Union from core.correctness.vars import DEBUG_WARNING, DEBUG_INFO, EVENT_TYPE, \ VALID_CHANNELS, JOB_ID, META_FILE from core.correctness.validation import setup_debugging, check_type, \ valid_list from core.functionality import print_debug, wait, read_yaml from core.meow import BaseHandler, BaseMonitor, BaseConductor class MeowRunner: # A collection of all monitors in the runner monitors:list[BaseMonitor] # A collection of all handlers in the runner handlers:list[BaseHandler] # A collection of all conductors in the runner conductors:list[BaseConductor] # A collection of all channels from each monitor from_monitors: list[VALID_CHANNELS] # A collection of all channels from each handler from_handlers: list[VALID_CHANNELS] def __init__(self, monitors:Union[BaseMonitor,list[BaseMonitor]], handlers:Union[BaseHandler,list[BaseHandler]], conductors:Union[BaseConductor,list[BaseConductor]], print:Any=sys.stdout, logging:int=0)->None: """MeowRunner constructor. This connects all provided monitors, handlers and conductors according to what events and jobs they produce or consume.""" self._is_valid_conductors(conductors) # If conductors isn't a list, make it one if not type(conductors) == list: conductors = [conductors] self.conductors = conductors self._is_valid_handlers(handlers) # If handlers isn't a list, make it one if not type(handlers) == list: handlers = [handlers] self.from_handlers = [] for handler in handlers: # Create a channel from the handler back to this runner handler_to_runner_reader, handler_to_runner_writer = Pipe() handler.to_runner = handler_to_runner_writer self.from_handlers.append(handler_to_runner_reader) self.handlers = handlers self._is_valid_monitors(monitors) # If monitors isn't a list, make it one if not type(monitors) == list: monitors = [monitors] self.monitors = monitors self.from_monitors = [] for monitor in self.monitors: # Create a channel from the monitor back to this runner monitor_to_runner_reader, monitor_to_runner_writer = Pipe() monitor.to_runner = monitor_to_runner_writer self.from_monitors.append(monitor_to_runner_reader) # Create channel to send stop messages to monitor/handler thread self._stop_mon_han_pipe = Pipe() self._mon_han_worker = None # Create channel to send stop messages to handler/conductor thread self._stop_han_con_pipe = Pipe() self._han_con_worker = None # Setup debugging self._print_target, self.debug_level = setup_debugging(print, logging) def run_monitor_handler_interaction(self)->None: """Function to be run in its own thread, to handle any inbound messages from monitors. These will be events, which should be matched to an appropriate handler and handled.""" all_inputs = self.from_monitors + [self._stop_mon_han_pipe[0]] while True: ready = wait(all_inputs) # If we get a message from the stop channel, then finish if self._stop_mon_han_pipe[0] in ready: return else: for from_monitor in self.from_monitors: if from_monitor in ready: # Read event from the monitor channel message = from_monitor.recv() event = message valid_handlers = [] for handler in self.handlers: try: valid, _ = handler.valid_handle_criteria(event) if valid: valid_handlers.append(handler) except Exception as e: print_debug( self._print_target, self.debug_level, "Could not determine validity of event " f"for handler. {e}", DEBUG_INFO ) # If we've only one handler, use that if len(valid_handlers) == 1: handler = valid_handlers[0] self.handle_event(handler, event) # If multiple handlers then randomly pick one else: handler = valid_handlers[ randrange(len(valid_handlers)) ] self.handle_event(handler, event) def run_handler_conductor_interaction(self)->None: """Function to be run in its own thread, to handle any inbound messages from handlers. These will be jobs, which should be matched to an appropriate conductor and executed.""" all_inputs = self.from_handlers + [self._stop_han_con_pipe[0]] while True: ready = wait(all_inputs) # If we get a message from the stop channel, then finish if self._stop_han_con_pipe[0] in ready: return else: for from_handler in self.from_handlers: if from_handler in ready: # Read job directory from the handler channel job_dir = from_handler.recv() try: metafile = os.path.join(job_dir, META_FILE) job = read_yaml(metafile) except Exception as e: print_debug(self._print_target, self.debug_level, "Could not load necessary job definitions for " f"job at '{job_dir}'. {e}", DEBUG_INFO) continue valid_conductors = [] for conductor in self.conductors: try: valid, _ = \ conductor.valid_execute_criteria(job) if valid: valid_conductors.append(conductor) except Exception as e: print_debug( self._print_target, self.debug_level, "Could not determine validity of job " f"for conductor. {e}", DEBUG_INFO ) # If we've only one conductor, use that if len(valid_conductors) == 1: conductor = valid_conductors[0] self.execute_job(conductor, job) # If multiple handlers then randomly pick one else: conductor = valid_conductors[ randrange(len(valid_conductors)) ] self.execute_job(conductor, job) def handle_event(self, handler:BaseHandler, event:dict[str:Any])->None: """Function for a given handler to handle a given event, without crashing the runner in the event of a problem.""" print_debug(self._print_target, self.debug_level, f"Starting handling for event: '{event[EVENT_TYPE]}'", DEBUG_INFO) try: handler.handle(event) print_debug(self._print_target, self.debug_level, f"Completed handling for event: '{event[EVENT_TYPE]}'", DEBUG_INFO) except Exception as e: print_debug(self._print_target, self.debug_level, "Something went wrong during handling for event " f"'{event[EVENT_TYPE]}'. {e}", DEBUG_INFO) def execute_job(self, conductor:BaseConductor, job:dict[str:Any])->None: """Function for a given conductor to execute a given job, without crashing the runner in the event of a problem.""" print_debug(self._print_target, self.debug_level, f"Starting execution for job: '{job[JOB_ID]}'", DEBUG_INFO) try: conductor.execute(job) print_debug(self._print_target, self.debug_level, f"Completed execution for job: '{job[JOB_ID]}'", DEBUG_INFO) except Exception as e: print_debug(self._print_target, self.debug_level, "Something went wrong during execution for job " f"'{job[JOB_ID]}'. {e}", DEBUG_INFO) def start(self)->None: """Function to start the runner by starting all of the constituent monitors, handlers and conductors, along with managing interaction threads.""" # Start all monitors for monitor in self.monitors: monitor.start() startable = [] # Start all handlers, if they need it for handler in self.handlers: if hasattr(handler, "start") and handler not in startable: startable.append() # Start all conductors, if they need it for conductor in self.conductors: if hasattr(conductor, "start") and conductor not in startable: startable.append() for starting in startable: starting.start() # If we've not started the monitor/handler interaction thread yet, then # do so if self._mon_han_worker is None: self._mon_han_worker = threading.Thread( target=self.run_monitor_handler_interaction, args=[]) self._mon_han_worker.daemon = True self._mon_han_worker.start() print_debug(self._print_target, self.debug_level, "Starting MeowRunner event handling...", DEBUG_INFO) else: msg = "Repeated calls to start MeowRunner event handling have " \ "no effect." print_debug(self._print_target, self.debug_level, msg, DEBUG_WARNING) raise RuntimeWarning(msg) # If we've not started the handler/conductor interaction thread yet, # then do so if self._han_con_worker is None: self._han_con_worker = threading.Thread( target=self.run_handler_conductor_interaction, args=[]) self._han_con_worker.daemon = True self._han_con_worker.start() print_debug(self._print_target, self.debug_level, "Starting MeowRunner job conducting...", DEBUG_INFO) else: msg = "Repeated calls to start MeowRunner job conducting have " \ "no effect." print_debug(self._print_target, self.debug_level, msg, DEBUG_WARNING) raise RuntimeWarning(msg) def stop(self)->None: """Function to stop the runner by stopping all of the constituent monitors, handlers and conductors, along with managing interaction threads.""" # Stop all the monitors for monitor in self.monitors: monitor.stop() stopable = [] # Stop all handlers, if they need it for handler in self.handlers: if hasattr(handler, "stop") and handler not in stopable: stopable.append() # Stop all conductors, if they need it for conductor in self.conductors: if hasattr(conductor, "stop") and conductor not in stopable: stopable.append() for stopping in stopable: stopping.stop() # If we've started the monitor/handler interaction thread, then stop it if self._mon_han_worker is None: msg = "Cannot stop event handling thread that is not started." print_debug(self._print_target, self.debug_level, msg, DEBUG_WARNING) raise RuntimeWarning(msg) else: self._stop_mon_han_pipe[1].send(1) self._mon_han_worker.join() print_debug(self._print_target, self.debug_level, "Event handler thread stopped", DEBUG_INFO) # If we've started the handler/conductor interaction thread, then stop # it if self._han_con_worker is None: msg = "Cannot stop job conducting thread that is not started." print_debug(self._print_target, self.debug_level, msg, DEBUG_WARNING) raise RuntimeWarning(msg) else: self._stop_han_con_pipe[1].send(1) self._han_con_worker.join() print_debug(self._print_target, self.debug_level, "Job conductor thread stopped", DEBUG_INFO) def _is_valid_monitors(self, monitors:Union[BaseMonitor,list[BaseMonitor]])->None: """Validation check for 'monitors' variable from main constructor.""" check_type(monitors, BaseMonitor, alt_types=[list]) if type(monitors) == list: valid_list(monitors, BaseMonitor, min_length=1) def _is_valid_handlers(self, handlers:Union[BaseHandler,list[BaseHandler]])->None: """Validation check for 'handlers' variable from main constructor.""" check_type(handlers, BaseHandler, alt_types=[list]) if type(handlers) == list: valid_list(handlers, BaseHandler, min_length=1) def _is_valid_conductors(self, conductors:Union[BaseConductor,list[BaseConductor]])->None: """Validation check for 'conductors' variable from main constructor.""" check_type(conductors, BaseConductor, alt_types=[list]) if type(conductors) == list: valid_list(conductors, BaseConductor, min_length=1)