updated runner structure so that handlers and conductors actually pull from queues in the runner. changes to logic in both are extensive, but most individual functinos are unaffected. I've also moved several functions that were part of individual monitor, handler and conductors to the base classes.
This commit is contained in:
313
core/runner.py
313
core/runner.py
@ -11,15 +11,13 @@ import sys
|
||||
import threading
|
||||
|
||||
from multiprocessing import Pipe
|
||||
from random import randrange
|
||||
from typing import Any, Union, Dict, List, Type
|
||||
from typing import Any, Union, Dict, List, Type, Tuple
|
||||
|
||||
from meow_base.core.base_conductor import BaseConductor
|
||||
from meow_base.core.base_handler import BaseHandler
|
||||
from meow_base.core.base_monitor import BaseMonitor
|
||||
from meow_base.core.vars import DEBUG_WARNING, DEBUG_INFO, \
|
||||
EVENT_TYPE, VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, \
|
||||
DEFAULT_JOB_QUEUE_DIR, EVENT_PATH
|
||||
VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, DEFAULT_JOB_QUEUE_DIR
|
||||
from meow_base.functionality.validation import check_type, valid_list, \
|
||||
valid_dir_path, check_implementation
|
||||
from meow_base.functionality.debug import setup_debugging, print_debug
|
||||
@ -34,14 +32,18 @@ class MeowRunner:
|
||||
handlers:List[BaseHandler]
|
||||
# A collection of all conductors in the runner
|
||||
conductors:List[BaseConductor]
|
||||
# A collection of all channels from each monitor
|
||||
from_monitors: List[VALID_CHANNELS]
|
||||
# A collection of all channels from each handler
|
||||
from_handlers: List[VALID_CHANNELS]
|
||||
# A collection of all inputs for the event queue
|
||||
event_connections: List[Tuple[VALID_CHANNELS,Union[BaseMonitor,BaseHandler]]]
|
||||
# A collection of all inputs for the job queue
|
||||
job_connections: List[Tuple[VALID_CHANNELS,Union[BaseHandler,BaseConductor]]]
|
||||
# Directory where queued jobs are initially written to
|
||||
job_queue_dir:str
|
||||
# Directory where completed jobs are finally written to
|
||||
job_output_dir:str
|
||||
# A queue of all events found by monitors, awaiting handling by handlers
|
||||
event_queue:List[Dict[str,Any]]
|
||||
# A queue of all jobs setup by handlers, awaiting execution by conductors
|
||||
job_queue:List[str]
|
||||
def __init__(self, monitors:Union[BaseMonitor,List[BaseMonitor]],
|
||||
handlers:Union[BaseHandler,List[BaseHandler]],
|
||||
conductors:Union[BaseConductor,List[BaseConductor]],
|
||||
@ -55,6 +57,37 @@ class MeowRunner:
|
||||
self._is_valid_job_queue_dir(job_queue_dir)
|
||||
self._is_valid_job_output_dir(job_output_dir)
|
||||
|
||||
self.job_connections = []
|
||||
self.event_connections = []
|
||||
|
||||
self._is_valid_monitors(monitors)
|
||||
# If monitors isn't a list, make it one
|
||||
if not type(monitors) == list:
|
||||
monitors = [monitors]
|
||||
self.monitors = monitors
|
||||
for monitor in self.monitors:
|
||||
# Create a channel from the monitor back to this runner
|
||||
monitor_to_runner_reader, monitor_to_runner_writer = Pipe()
|
||||
monitor.to_runner_event = monitor_to_runner_writer
|
||||
self.event_connections.append((monitor_to_runner_reader, monitor))
|
||||
|
||||
self._is_valid_handlers(handlers)
|
||||
# If handlers isn't a list, make it one
|
||||
if not type(handlers) == list:
|
||||
handlers = [handlers]
|
||||
for handler in handlers:
|
||||
handler.job_queue_dir = job_queue_dir
|
||||
|
||||
# Create channels from the handler back to this runner
|
||||
h_to_r_event_runner, h_to_r_event_handler = Pipe(duplex=True)
|
||||
h_to_r_job_reader, h_to_r_job_writer = Pipe()
|
||||
|
||||
handler.to_runner_event = h_to_r_event_handler
|
||||
handler.to_runner_job = h_to_r_job_writer
|
||||
self.event_connections.append((h_to_r_event_runner, handler))
|
||||
self.job_connections.append((h_to_r_job_reader, handler))
|
||||
self.handlers = handlers
|
||||
|
||||
self._is_valid_conductors(conductors)
|
||||
# If conductors isn't a list, make it one
|
||||
if not type(conductors) == list:
|
||||
@ -63,33 +96,13 @@ class MeowRunner:
|
||||
conductor.job_output_dir = job_output_dir
|
||||
conductor.job_queue_dir = job_queue_dir
|
||||
|
||||
# Create a channel from the conductor back to this runner
|
||||
c_to_r_job_runner, c_to_r_job_conductor = Pipe(duplex=True)
|
||||
|
||||
conductor.to_runner_job = c_to_r_job_conductor
|
||||
self.job_connections.append((c_to_r_job_runner, conductor))
|
||||
self.conductors = conductors
|
||||
|
||||
self._is_valid_handlers(handlers)
|
||||
# If handlers isn't a list, make it one
|
||||
if not type(handlers) == list:
|
||||
handlers = [handlers]
|
||||
self.from_handlers = []
|
||||
for handler in handlers:
|
||||
# Create a channel from the handler back to this runner
|
||||
handler_to_runner_reader, handler_to_runner_writer = Pipe()
|
||||
handler.to_runner = handler_to_runner_writer
|
||||
handler.job_queue_dir = job_queue_dir
|
||||
self.from_handlers.append(handler_to_runner_reader)
|
||||
self.handlers = handlers
|
||||
|
||||
self._is_valid_monitors(monitors)
|
||||
# If monitors isn't a list, make it one
|
||||
if not type(monitors) == list:
|
||||
monitors = [monitors]
|
||||
self.monitors = monitors
|
||||
self.from_monitors = []
|
||||
for monitor in self.monitors:
|
||||
# Create a channel from the monitor back to this runner
|
||||
monitor_to_runner_reader, monitor_to_runner_writer = Pipe()
|
||||
monitor.to_runner = monitor_to_runner_writer
|
||||
self.from_monitors.append(monitor_to_runner_reader)
|
||||
|
||||
# Create channel to send stop messages to monitor/handler thread
|
||||
self._stop_mon_han_pipe = Pipe()
|
||||
self._mon_han_worker = None
|
||||
@ -101,11 +114,16 @@ class MeowRunner:
|
||||
# Setup debugging
|
||||
self._print_target, self.debug_level = setup_debugging(print, logging)
|
||||
|
||||
# Setup queues
|
||||
self.event_queue = []
|
||||
self.job_queue = []
|
||||
|
||||
def run_monitor_handler_interaction(self)->None:
|
||||
"""Function to be run in its own thread, to handle any inbound messages
|
||||
from monitors. These will be events, which should be matched to an
|
||||
appropriate handler and handled."""
|
||||
all_inputs = self.from_monitors + [self._stop_mon_han_pipe[0]]
|
||||
all_inputs = [i[0] for i in self.event_connections] \
|
||||
+ [self._stop_mon_han_pipe[0]]
|
||||
while True:
|
||||
ready = wait(all_inputs)
|
||||
|
||||
@ -113,56 +131,45 @@ class MeowRunner:
|
||||
if self._stop_mon_han_pipe[0] in ready:
|
||||
return
|
||||
else:
|
||||
handled = False
|
||||
for from_monitor in self.from_monitors:
|
||||
if from_monitor in ready:
|
||||
# Read event from the monitor channel
|
||||
message = from_monitor.recv()
|
||||
event = message
|
||||
for connection, component in self.event_connections:
|
||||
if connection not in ready:
|
||||
continue
|
||||
message = connection.recv()
|
||||
|
||||
valid_handlers = []
|
||||
for handler in self.handlers:
|
||||
# Recieved an event
|
||||
if isinstance(component, BaseMonitor):
|
||||
self.event_queue.append(message)
|
||||
continue
|
||||
# Recieved a request for an event
|
||||
if isinstance(component, BaseHandler):
|
||||
valid = False
|
||||
for event in self.event_queue:
|
||||
try:
|
||||
valid, _ = handler.valid_handle_criteria(event)
|
||||
if valid:
|
||||
valid_handlers.append(handler)
|
||||
valid, _ = component.valid_handle_criteria(event)
|
||||
except Exception as e:
|
||||
print_debug(
|
||||
self._print_target,
|
||||
self.debug_level,
|
||||
"Could not determine validity of event "
|
||||
f"for handler. {e}",
|
||||
"Could not determine validity of "
|
||||
f"event for handler {component.name}. {e}",
|
||||
DEBUG_INFO
|
||||
)
|
||||
|
||||
# If we've only one handler, use that
|
||||
if len(valid_handlers) == 1:
|
||||
handler = valid_handlers[0]
|
||||
handled = True
|
||||
self.handle_event(handler, event)
|
||||
break
|
||||
# If multiple handlers then randomly pick one
|
||||
elif len(valid_handlers) > 1:
|
||||
handler = valid_handlers[
|
||||
randrange(len(valid_handlers))
|
||||
]
|
||||
handled = True
|
||||
self.handle_event(handler, event)
|
||||
break
|
||||
|
||||
if not handled:
|
||||
print_debug(
|
||||
self._print_target,
|
||||
self.debug_level,
|
||||
"Could not determine handler for event.",
|
||||
DEBUG_INFO
|
||||
)
|
||||
|
||||
if valid:
|
||||
self.event_queue.remove(event)
|
||||
connection.send(event)
|
||||
break
|
||||
|
||||
# If nothing valid then send a message
|
||||
if not valid:
|
||||
connection.send(1)
|
||||
|
||||
def run_handler_conductor_interaction(self)->None:
|
||||
"""Function to be run in its own thread, to handle any inbound messages
|
||||
from handlers. These will be jobs, which should be matched to an
|
||||
appropriate conductor and executed."""
|
||||
all_inputs = self.from_handlers + [self._stop_han_con_pipe[0]]
|
||||
all_inputs = [i[0] for i in self.job_connections] \
|
||||
+ [self._stop_han_con_pipe[0]]
|
||||
while True:
|
||||
ready = wait(all_inputs)
|
||||
|
||||
@ -170,102 +177,52 @@ class MeowRunner:
|
||||
if self._stop_han_con_pipe[0] in ready:
|
||||
return
|
||||
else:
|
||||
executed = False
|
||||
for from_handler in self.from_handlers:
|
||||
if from_handler in ready:
|
||||
# Read job directory from the handler channel
|
||||
job_dir = from_handler.recv()
|
||||
try:
|
||||
metafile = os.path.join(job_dir, META_FILE)
|
||||
job = threadsafe_read_status(metafile)
|
||||
except Exception as e:
|
||||
print_debug(self._print_target, self.debug_level,
|
||||
"Could not load necessary job definitions for "
|
||||
f"job at '{job_dir}'. {e}", DEBUG_INFO)
|
||||
continue
|
||||
for connection, component in self.job_connections:
|
||||
if connection not in ready:
|
||||
continue
|
||||
|
||||
valid_conductors = []
|
||||
for conductor in self.conductors:
|
||||
message = connection.recv()
|
||||
|
||||
# Recieved an event
|
||||
if isinstance(component, BaseHandler):
|
||||
self.job_queue.append(message)
|
||||
continue
|
||||
# Recieved a request for an event
|
||||
if isinstance(component, BaseConductor):
|
||||
valid = False
|
||||
for job_dir in self.job_queue:
|
||||
try:
|
||||
valid, _ = \
|
||||
conductor.valid_execute_criteria(job)
|
||||
if valid:
|
||||
valid_conductors.append(conductor)
|
||||
metafile = os.path.join(job_dir, META_FILE)
|
||||
job = threadsafe_read_status(metafile)
|
||||
except Exception as e:
|
||||
print_debug(
|
||||
self._print_target,
|
||||
self.debug_level,
|
||||
"Could not determine validity of job "
|
||||
f"for conductor. {e}",
|
||||
"Could not load necessary job definitions "
|
||||
f"for job at '{job_dir}'. {e}",
|
||||
DEBUG_INFO
|
||||
)
|
||||
|
||||
# If we've only one conductor, use that
|
||||
if len(valid_conductors) == 1:
|
||||
conductor = valid_conductors[0]
|
||||
executed = True
|
||||
self.execute_job(conductor, job_dir)
|
||||
break
|
||||
# If multiple handlers then randomly pick one
|
||||
elif len(valid_conductors) > 1:
|
||||
conductor = valid_conductors[
|
||||
randrange(len(valid_conductors))
|
||||
]
|
||||
executed = True
|
||||
self.execute_job(conductor, job_dir)
|
||||
break
|
||||
|
||||
# TODO determine something more useful to do here
|
||||
if not executed:
|
||||
print_debug(
|
||||
self._print_target,
|
||||
self.debug_level,
|
||||
f"No conductor could be found for job {job_dir}",
|
||||
DEBUG_INFO
|
||||
)
|
||||
|
||||
def handle_event(self, handler:BaseHandler, event:Dict[str,Any])->None:
|
||||
"""Function for a given handler to handle a given event, without
|
||||
crashing the runner in the event of a problem."""
|
||||
print_debug(self._print_target, self.debug_level,
|
||||
f"Starting handling for {event[EVENT_TYPE]} event: "
|
||||
f"'{event[EVENT_PATH]}'", DEBUG_INFO)
|
||||
try:
|
||||
handler.handle(event)
|
||||
print_debug(self._print_target, self.debug_level,
|
||||
f"Completed handling for {event[EVENT_TYPE]} event: "
|
||||
f"'{event[EVENT_PATH]}'", DEBUG_INFO)
|
||||
except Exception as e:
|
||||
print_debug(self._print_target, self.debug_level,
|
||||
f"Something went wrong during handling for {event[EVENT_TYPE]}"
|
||||
f" event '{event[EVENT_PATH]}'. {e}", DEBUG_INFO)
|
||||
|
||||
def execute_job(self, conductor:BaseConductor, job_dir:str)->None:
|
||||
"""Function for a given conductor to execute a given job, without
|
||||
crashing the runner in the event of a problem."""
|
||||
job_id = os.path.basename(job_dir)
|
||||
print_debug(
|
||||
self._print_target,
|
||||
self.debug_level,
|
||||
f"Starting execution for job: '{job_id}'",
|
||||
DEBUG_INFO
|
||||
)
|
||||
try:
|
||||
conductor.execute(job_dir)
|
||||
print_debug(
|
||||
self._print_target,
|
||||
self.debug_level,
|
||||
f"Completed execution for job: '{job_id}'",
|
||||
DEBUG_INFO
|
||||
)
|
||||
except Exception as e:
|
||||
print_debug(
|
||||
self._print_target,
|
||||
self.debug_level,
|
||||
f"Something went wrong in execution of job '{job_id}'. {e}",
|
||||
DEBUG_INFO
|
||||
)
|
||||
try:
|
||||
valid, _ = component.valid_execute_criteria(job)
|
||||
except Exception as e:
|
||||
print_debug(
|
||||
self._print_target,
|
||||
self.debug_level,
|
||||
"Could not determine validity of "
|
||||
f"job for conductor {component.name}. {e}",
|
||||
DEBUG_INFO
|
||||
)
|
||||
|
||||
if valid:
|
||||
self.job_queue.remove(job_dir)
|
||||
connection.send(job_dir)
|
||||
break
|
||||
|
||||
# If nothing valid then send a message
|
||||
if not valid:
|
||||
connection.send(1)
|
||||
|
||||
def start(self)->None:
|
||||
"""Function to start the runner by starting all of the constituent
|
||||
monitors, handlers and conductors, along with managing interaction
|
||||
@ -273,25 +230,14 @@ class MeowRunner:
|
||||
# Start all monitors
|
||||
for monitor in self.monitors:
|
||||
monitor.start()
|
||||
startable = []
|
||||
# Start all handlers, if they need it
|
||||
|
||||
# Start all handlers
|
||||
for handler in self.handlers:
|
||||
try:
|
||||
check_implementation(handler.start, BaseHandler)
|
||||
if handler not in startable:
|
||||
startable.append(handler)
|
||||
except NotImplementedError:
|
||||
pass
|
||||
# Start all conductors, if they need it
|
||||
handler.start()
|
||||
|
||||
# Start all conductors
|
||||
for conductor in self.conductors:
|
||||
try:
|
||||
check_implementation(conductor.start, BaseConductor)
|
||||
if conductor not in startable:
|
||||
startable.append(conductor)
|
||||
except NotImplementedError:
|
||||
pass
|
||||
for starting in startable:
|
||||
starting.start()
|
||||
conductor.start()
|
||||
|
||||
# If we've not started the monitor/handler interaction thread yet, then
|
||||
# do so
|
||||
@ -331,29 +277,18 @@ class MeowRunner:
|
||||
"""Function to stop the runner by stopping all of the constituent
|
||||
monitors, handlers and conductors, along with managing interaction
|
||||
threads."""
|
||||
|
||||
# Stop all the monitors
|
||||
for monitor in self.monitors:
|
||||
monitor.stop()
|
||||
|
||||
stopable = []
|
||||
# Stop all handlers, if they need it
|
||||
for handler in self.handlers:
|
||||
try:
|
||||
check_implementation(handler.stop, BaseHandler)
|
||||
if handler not in stopable:
|
||||
stopable.append(handler)
|
||||
except NotImplementedError:
|
||||
pass
|
||||
handler.stop()
|
||||
|
||||
# Stop all conductors, if they need it
|
||||
for conductor in self.conductors:
|
||||
try:
|
||||
check_implementation(conductor.stop, BaseConductor)
|
||||
if conductor not in stopable:
|
||||
stopable.append(conductor)
|
||||
except NotImplementedError:
|
||||
pass
|
||||
for stopping in stopable:
|
||||
stopping.stop()
|
||||
conductor.stop()
|
||||
|
||||
# If we've started the monitor/handler interaction thread, then stop it
|
||||
if self._mon_han_worker is None:
|
||||
|
Reference in New Issue
Block a user