reformted jobs being passed to conductors so they only get a job directory and have to read the definitions from the appropriate files

This commit is contained in:
PatchOfScotland
2023-02-09 15:22:26 +01:00
parent 1eb022f79e
commit a2df62c693
19 changed files with 528 additions and 288 deletions

View File

@ -44,7 +44,7 @@ JOB_KEYS = {
}
def check_type(variable:Any, expected_type:Type, alt_types:List[Type]=[],
or_none:bool=False)->None:
or_none:bool=False, hint:str="")->None:
"""Checks if a given variable is of the expected type. Raises TypeError or
ValueError as appropriate if any issues are encountered."""
@ -57,9 +57,11 @@ def check_type(variable:Any, expected_type:Type, alt_types:List[Type]=[],
# Only accept None if explicitly allowed
if variable is None:
if or_none == False:
raise TypeError(
f'Not allowed None for variable. Expected {expected_type}.'
)
if hint:
msg = f"Not allowed None for {hint}. Expected {expected_type}."
else:
msg = f"Not allowed None. Expected {expected_type}."
raise TypeError(msg)
else:
return
@ -69,10 +71,12 @@ def check_type(variable:Any, expected_type:Type, alt_types:List[Type]=[],
# Check that variable type is within the accepted type list
if not isinstance(variable, tuple(type_list)):
raise TypeError(
'Expected type(s) are %s, got %s'
% (get_args(expected_type), type(variable))
)
if hint:
msg = f"Expected type(s) for {hint} are '{type_list}', " \
f"got {type(variable)}"
else:
msg = f"Expected type(s) are '{type_list}', got {type(variable)}"
raise TypeError(msg)
def check_callable(call:Any)->None:
"""Checks if a given variable is a callable function. Raises TypeError if
@ -216,16 +220,19 @@ def valid_existing_file_path(variable:str, allow_base:bool=False,
raise ValueError(
f"Requested file '{variable}' is not a file.")
def valid_existing_dir_path(variable:str, allow_base:bool=False):
"""Check the given string is a path to an existing directory."""
def valid_dir_path(variable:str, must_exist:bool=False, allow_base:bool=False
)->None:
"""Check the given string is a valid directory path, either to an existing
one or a location that could contain one."""
# Check that the string is a path
valid_path(variable, allow_base=allow_base, extension="")
# Check the path exists
if not exists(variable):
does_exist = exists(variable)
if must_exist and not does_exist:
raise FileNotFoundError(
f"Requested dir path '{variable}' does not exist.")
# Check it is a directory
if not isdir(variable):
if does_exist and not isdir(variable):
raise ValueError(
f"Requested dir '{variable}' is not a directory.")

View File

@ -77,13 +77,15 @@ DIR_EVENTS = [
DIR_RETROACTIVE_EVENT
]
# runner defaults
DEFAULT_JOB_QUEUE_DIR = "job_queue"
DEFAULT_JOB_OUTPUT_DIR = "job_output"
# meow jobs
JOB_TYPE = "job_type"
JOB_TYPE_PYTHON = "python"
JOB_TYPE_PAPERMILL = "papermill"
PYTHON_FUNC = "func"
PYTHON_EXECUTION_BASE = "exection_base"
PYTHON_OUTPUT_DIR = "output_dir"
JOB_TYPES = {
JOB_TYPE_PAPERMILL: [

View File

@ -88,8 +88,8 @@ def _get_file_sha256(file_path):
return sha256_hash.hexdigest()
def get_file_hash(file_path:str, hash:str):
check_type(hash, str)
def get_file_hash(file_path:str, hash:str, hint:str=""):
check_type(hash, str, hint=hint)
valid_existing_file_path(file_path)
@ -204,7 +204,8 @@ def write_notebook(source:Dict[str,Any], filename:str):
def parameterize_jupyter_notebook(jupyter_notebook:Dict[str,Any],
parameters:Dict[str,Any], expand_env_values:bool=False)->Dict[str,Any]:
nbformat.validate(jupyter_notebook)
check_type(parameters, Dict)
check_type(parameters, Dict,
hint="parameterize_jupyter_notebook.parameters")
if jupyter_notebook["nbformat"] != 4:
raise Warning(
@ -277,7 +278,8 @@ def parameterize_jupyter_notebook(jupyter_notebook:Dict[str,Any],
def parameterize_python_script(script:List[str], parameters:Dict[str,Any],
expand_env_values:bool=False)->Dict[str,Any]:
check_script(script)
check_type(parameters, Dict)
check_type(parameters, Dict
,hint="parameterize_python_script.parameters")
output_script = copy.deepcopy(script)

View File

@ -142,7 +142,7 @@ class BasePattern:
"""Validation check for 'sweep' variable from main constructor. This
function is implemented to check for the types given in the signature,
and must be overridden if these differ."""
check_type(sweep, Dict)
check_type(sweep, Dict, hint="BasePattern.sweep")
if not sweep:
return
for _, v in sweep.items():
@ -152,11 +152,23 @@ class BasePattern:
], strict=True)
check_type(
v[SWEEP_START], expected_type=int, alt_types=[float, complex])
v[SWEEP_START],
expected_type=int,
alt_types=[float, complex],
hint=f"BasePattern.sweep[{SWEEP_START}]"
)
check_type(
v[SWEEP_STOP], expected_type=int, alt_types=[float, complex])
v[SWEEP_STOP],
expected_type=int,
alt_types=[float, complex],
hint=f"BasePattern.sweep[{SWEEP_STOP}]"
)
check_type(
v[SWEEP_JUMP], expected_type=int, alt_types=[float, complex])
v[SWEEP_JUMP],
expected_type=int,
alt_types=[float, complex],
hint=f"BasePattern.sweep[{SWEEP_JUMP}]"
)
# Try to check that this loop is not infinite
if v[SWEEP_JUMP] == 0:
raise ValueError(
@ -215,8 +227,8 @@ class BaseRule:
self.pattern = pattern
self._is_valid_recipe(recipe)
self.recipe = recipe
check_type(pattern, BasePattern)
check_type(recipe, BaseRecipe)
check_type(pattern, BasePattern, hint="BaseRule.pattern")
check_type(recipe, BaseRecipe, hint="BaseRule.recipe")
if pattern.recipe != recipe.name:
raise ValueError(f"Cannot create Rule {name}. Pattern "
f"{pattern.name} does not identify Recipe {recipe.name}. It "
@ -369,10 +381,14 @@ class BaseMonitor:
class BaseHandler:
# A channel for sending messages to the runner. Note that this is not
# initialised within the constructor, but within the runner when passed the
# handler is passed to it.
# A channel for sending messages to the runner. Note that this will be
# overridden by a MeowRunner, if a handler instance is passed to it, and so
# does not need to be initialised within the handler itself.
to_runner: VALID_CHANNELS
# Directory where queued jobs are initially written to. Note that this
# will be overridden by a MeowRunner, if a handler instance is passed to
# it, and so does not need to be initialised within the handler itself.
job_queue_dir:str
def __init__(self)->None:
"""BaseHandler Constructor. This will check that any class inheriting
from it implements its validation functions."""
@ -399,6 +415,14 @@ class BaseHandler:
class BaseConductor:
# Directory where queued jobs are initially written to. Note that this
# will be overridden by a MeowRunner, if a handler instance is passed to
# it, and so does not need to be initialised within the handler itself.
job_queue_dir:str
# Directory where completed jobs are finally written to. Note that this
# will be overridden by a MeowRunner, if a handler instance is passed to
# it, and so does not need to be initialised within the handler itself.
job_output_dir:str
def __init__(self)->None:
"""BaseConductor Constructor. This will check that any class inheriting
from it implements its validation functions."""
@ -418,9 +442,9 @@ class BaseConductor:
process it or not. Must be implemented by any child process."""
pass
def execute(self, job:Dict[str,Any])->None:
"""Function to execute a given job. Must be implemented by any child
process."""
def execute(self, job_dir:str)->None:
"""Function to execute a given job directory. Must be implemented by
any child process."""
pass
@ -433,8 +457,8 @@ def create_rules(patterns:Union[Dict[str,BasePattern],List[BasePattern]],
provided pattern and recipe dictionaries must be keyed with the
corresponding pattern and recipe names."""
# Validation of inputs
check_type(patterns, Dict, alt_types=[List])
check_type(recipes, Dict, alt_types=[List])
check_type(patterns, Dict, alt_types=[List], hint="create_rules.patterns")
check_type(recipes, Dict, alt_types=[List], hint="create_rules.recipes")
valid_list(new_rules, BaseRule, min_length=0)
# Convert a pattern list to a dictionary
@ -481,8 +505,8 @@ def create_rule(pattern:BasePattern, recipe:BaseRecipe,
"""Function to create a valid rule from a given pattern and recipe. All
inbuilt rule types are considered, with additional definitions provided
through the 'new_rules' variable."""
check_type(pattern, BasePattern)
check_type(recipe, BaseRecipe)
check_type(pattern, BasePattern, hint="create_rule.pattern")
check_type(recipe, BaseRecipe, hint="create_rule.recipe")
valid_list(new_rules, BaseRule, min_length=0)
# Imported here to avoid circular imports at top of file

View File

@ -15,10 +15,11 @@ from random import randrange
from typing import Any, Union, Dict, List
from core.correctness.vars import DEBUG_WARNING, DEBUG_INFO, EVENT_TYPE, \
VALID_CHANNELS, JOB_ID, META_FILE
VALID_CHANNELS, JOB_ID, META_FILE, DEFAULT_JOB_OUTPUT_DIR, \
DEFAULT_JOB_QUEUE_DIR
from core.correctness.validation import setup_debugging, check_type, \
valid_list
from core.functionality import print_debug, wait, read_yaml
valid_list, valid_dir_path
from core.functionality import print_debug, wait, read_yaml, make_dir
from core.meow import BaseHandler, BaseMonitor, BaseConductor
@ -33,18 +34,32 @@ class MeowRunner:
from_monitors: List[VALID_CHANNELS]
# A collection of all channels from each handler
from_handlers: List[VALID_CHANNELS]
# Directory where queued jobs are initially written to
job_queue_dir:str
# Directory where completed jobs are finally written to
job_output_dir:str
def __init__(self, monitors:Union[BaseMonitor,List[BaseMonitor]],
handlers:Union[BaseHandler,List[BaseHandler]],
conductors:Union[BaseConductor,List[BaseConductor]],
job_queue_dir:str=DEFAULT_JOB_QUEUE_DIR,
job_output_dir:str=DEFAULT_JOB_OUTPUT_DIR,
print:Any=sys.stdout, logging:int=0)->None:
"""MeowRunner constructor. This connects all provided monitors,
handlers and conductors according to what events and jobs they produce
or consume."""
self._is_valid_job_queue_dir(job_queue_dir)
self._is_valid_job_output_dir(job_output_dir)
self._is_valid_conductors(conductors)
# If conductors isn't a list, make it one
if not type(conductors) == list:
conductors = [conductors]
for conductor in conductors:
conductor.job_output_dir = job_output_dir
conductor.job_queue_dir = job_queue_dir
self.conductors = conductors
self._is_valid_handlers(handlers)
@ -56,6 +71,7 @@ class MeowRunner:
# Create a channel from the handler back to this runner
handler_to_runner_reader, handler_to_runner_writer = Pipe()
handler.to_runner = handler_to_runner_writer
handler.job_queue_dir = job_queue_dir
self.from_handlers.append(handler_to_runner_reader)
self.handlers = handlers
@ -170,13 +186,13 @@ class MeowRunner:
# If we've only one conductor, use that
if len(valid_conductors) == 1:
conductor = valid_conductors[0]
self.execute_job(conductor, job)
self.execute_job(conductor, job_dir)
# If multiple handlers then randomly pick one
else:
conductor = valid_conductors[
randrange(len(valid_conductors))
]
self.execute_job(conductor, job)
self.execute_job(conductor, job_dir)
def handle_event(self, handler:BaseHandler, event:Dict[str,Any])->None:
"""Function for a given handler to handle a given event, without
@ -193,19 +209,31 @@ class MeowRunner:
"Something went wrong during handling for event "
f"'{event[EVENT_TYPE]}'. {e}", DEBUG_INFO)
def execute_job(self, conductor:BaseConductor, job:Dict[str,Any])->None:
def execute_job(self, conductor:BaseConductor, job_dir:str)->None:
"""Function for a given conductor to execute a given job, without
crashing the runner in the event of a problem."""
print_debug(self._print_target, self.debug_level,
f"Starting execution for job: '{job[JOB_ID]}'", DEBUG_INFO)
job_id = os.path.basename(job_dir)
print_debug(
self._print_target,
self.debug_level,
f"Starting execution for job: '{job_id}'",
DEBUG_INFO
)
try:
conductor.execute(job)
print_debug(self._print_target, self.debug_level,
f"Completed execution for job: '{job[JOB_ID]}'", DEBUG_INFO)
conductor.execute(job_dir)
print_debug(
self._print_target,
self.debug_level,
f"Completed execution for job: '{job_id}'",
DEBUG_INFO
)
except Exception as e:
print_debug(self._print_target, self.debug_level,
"Something went wrong during execution for job "
f"'{job[JOB_ID]}'. {e}", DEBUG_INFO)
print_debug(
self._print_target,
self.debug_level,
f"Something went wrong in execution of job '{job_id}'. {e}",
DEBUG_INFO
)
def start(self)->None:
"""Function to start the runner by starting all of the constituent
@ -308,20 +336,49 @@ class MeowRunner:
def _is_valid_monitors(self,
monitors:Union[BaseMonitor,List[BaseMonitor]])->None:
"""Validation check for 'monitors' variable from main constructor."""
check_type(monitors, BaseMonitor, alt_types=[List])
check_type(
monitors,
BaseMonitor,
alt_types=[List],
hint="MeowRunner.monitors"
)
if type(monitors) == list:
valid_list(monitors, BaseMonitor, min_length=1)
def _is_valid_handlers(self,
handlers:Union[BaseHandler,List[BaseHandler]])->None:
"""Validation check for 'handlers' variable from main constructor."""
check_type(handlers, BaseHandler, alt_types=[List])
check_type(
handlers,
BaseHandler,
alt_types=[List],
hint="MeowRunner.handlers"
)
if type(handlers) == list:
valid_list(handlers, BaseHandler, min_length=1)
def _is_valid_conductors(self,
conductors:Union[BaseConductor,List[BaseConductor]])->None:
"""Validation check for 'conductors' variable from main constructor."""
check_type(conductors, BaseConductor, alt_types=[List])
check_type(
conductors,
BaseConductor,
alt_types=[List],
hint="MeowRunner.conductors"
)
if type(conductors) == list:
valid_list(conductors, BaseConductor, min_length=1)
def _is_valid_job_queue_dir(self, job_queue_dir)->None:
"""Validation check for 'job_queue_dir' variable from main
constructor."""
valid_dir_path(job_queue_dir, must_exist=False)
if not os.path.exists(job_queue_dir):
make_dir(job_queue_dir)
def _is_valid_job_output_dir(self, job_output_dir)->None:
"""Validation check for 'job_output_dir' variable from main
constructor."""
valid_dir_path(job_output_dir, must_exist=False)
if not os.path.exists(job_output_dir):
make_dir(job_output_dir)