added comments throughout
This commit is contained in:
@ -1,16 +1,19 @@
|
||||
|
||||
"""
|
||||
This file contains definitions for a MEOW recipe based off of jupyter notebooks,
|
||||
along with an appropriate handler for said events.
|
||||
|
||||
Author(s): David Marchant
|
||||
"""
|
||||
import nbformat
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from multiprocessing import Pipe
|
||||
from typing import Any
|
||||
|
||||
from core.correctness.validation import check_type, valid_string, \
|
||||
valid_dict, valid_path, valid_list, valid_existing_dir_path, \
|
||||
setup_debugging
|
||||
from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, VALID_CHANNELS, \
|
||||
PYTHON_FUNC, DEBUG_INFO, WATCHDOG_TYPE, JOB_HASH, PYTHON_EXECUTION_BASE, \
|
||||
valid_dict, valid_path, valid_existing_dir_path, setup_debugging
|
||||
from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, PYTHON_FUNC, \
|
||||
DEBUG_INFO, WATCHDOG_TYPE, JOB_HASH, PYTHON_EXECUTION_BASE, \
|
||||
WATCHDOG_RULE, EVENT_PATH, PYTHON_TYPE, WATCHDOG_HASH, JOB_PARAMETERS, \
|
||||
PYTHON_OUTPUT_DIR
|
||||
from core.functionality import print_debug, create_job, replace_keywords
|
||||
@ -19,57 +22,75 @@ from patterns.file_event_pattern import SWEEP_START, SWEEP_STOP, SWEEP_JUMP
|
||||
|
||||
|
||||
class JupyterNotebookRecipe(BaseRecipe):
|
||||
# A path to the jupyter notebook used to create this recipe
|
||||
source:str
|
||||
def __init__(self, name:str, recipe:Any, parameters:dict[str,Any]={},
|
||||
requirements:dict[str,Any]={}, source:str=""):
|
||||
"""JupyterNotebookRecipe Constructor. This is used to execute analysis
|
||||
code using the papermill module."""
|
||||
super().__init__(name, recipe, parameters, requirements)
|
||||
self._is_valid_source(source)
|
||||
self.source = source
|
||||
|
||||
def _is_valid_source(self, source:str)->None:
|
||||
"""Validation check for 'source' variable from main constructor."""
|
||||
if source:
|
||||
valid_path(source, extension=".ipynb", min_length=0)
|
||||
|
||||
def _is_valid_recipe(self, recipe:dict[str,Any])->None:
|
||||
"""Validation check for 'recipe' variable from main constructor.
|
||||
Called within parent BaseRecipe constructor."""
|
||||
check_type(recipe, dict)
|
||||
nbformat.validate(recipe)
|
||||
|
||||
def _is_valid_parameters(self, parameters:dict[str,Any])->None:
|
||||
"""Validation check for 'parameters' variable from main constructor.
|
||||
Called within parent BaseRecipe constructor."""
|
||||
valid_dict(parameters, str, Any, strict=False, min_length=0)
|
||||
for k in parameters.keys():
|
||||
valid_string(k, VALID_VARIABLE_NAME_CHARS)
|
||||
|
||||
def _is_valid_requirements(self, requirements:dict[str,Any])->None:
|
||||
"""Validation check for 'requirements' variable from main constructor.
|
||||
Called within parent BaseRecipe constructor."""
|
||||
valid_dict(requirements, str, Any, strict=False, min_length=0)
|
||||
for k in requirements.keys():
|
||||
valid_string(k, VALID_VARIABLE_NAME_CHARS)
|
||||
|
||||
class PapermillHandler(BaseHandler):
|
||||
# TODO move me to conductor
|
||||
# Execution directory
|
||||
handler_base:str
|
||||
# TODO possibly move me also to conductor?
|
||||
# Final location for job output to be placed
|
||||
output_dir:str
|
||||
# Config option, above which debug messages are ignored
|
||||
debug_level:int
|
||||
_worker:threading.Thread
|
||||
_stop_pipe:Pipe
|
||||
# Where print messages are sent
|
||||
_print_target:Any
|
||||
def __init__(self, handler_base:str, output_dir:str, print:Any=sys.stdout,
|
||||
logging:int=0)->None:
|
||||
"""PapermillHandler Constructor. This creats jobs to be executed using
|
||||
the papermill module. This does not run as a continuous thread to
|
||||
handle execution, but is invoked according to a factory pattern using
|
||||
the handle function."""
|
||||
super().__init__()
|
||||
self._is_valid_handler_base(handler_base)
|
||||
self.handler_base = handler_base
|
||||
self._is_valid_output_dir(output_dir)
|
||||
self.output_dir = output_dir
|
||||
self._print_target, self.debug_level = setup_debugging(print, logging)
|
||||
self._worker = None
|
||||
self._stop_pipe = Pipe()
|
||||
print_debug(self._print_target, self.debug_level,
|
||||
"Created new PapermillHandler instance", DEBUG_INFO)
|
||||
|
||||
def handle(self, event:dict[str,Any])->None:
|
||||
"""Function called to handle a given event."""
|
||||
print_debug(self._print_target, self.debug_level,
|
||||
f"Handling event {event[EVENT_PATH]}", DEBUG_INFO)
|
||||
|
||||
rule = event[WATCHDOG_RULE]
|
||||
|
||||
# Assemble job parameters dict from pattern variables
|
||||
yaml_dict = {}
|
||||
for var, val in rule.pattern.parameters.items():
|
||||
yaml_dict[var] = val
|
||||
@ -77,9 +98,11 @@ class PapermillHandler(BaseHandler):
|
||||
yaml_dict[var] = val
|
||||
yaml_dict[rule.pattern.triggering_file] = event[EVENT_PATH]
|
||||
|
||||
# If no parameter sweeps, then one job will suffice
|
||||
if not rule.pattern.sweep:
|
||||
self.setup_job(event, yaml_dict)
|
||||
else:
|
||||
# If parameter sweeps, then many jobs created
|
||||
for var, val in rule.pattern.sweep.items():
|
||||
values = []
|
||||
par_val = rule.pattern.sweep[SWEEP_START]
|
||||
@ -92,18 +115,25 @@ class PapermillHandler(BaseHandler):
|
||||
self.setup_job(event, yaml_dict)
|
||||
|
||||
def valid_event_types(self)->list[str]:
|
||||
"""Function to provide a list of the types of events this handler can
|
||||
process."""
|
||||
return [WATCHDOG_TYPE]
|
||||
|
||||
def _is_valid_inputs(self, inputs:list[VALID_CHANNELS])->None:
|
||||
valid_list(inputs, VALID_CHANNELS)
|
||||
|
||||
def _is_valid_handler_base(self, handler_base)->None:
|
||||
"""Validation check for 'handler_base' variable from main
|
||||
constructor."""
|
||||
valid_existing_dir_path(handler_base)
|
||||
|
||||
def _is_valid_output_dir(self, output_dir)->None:
|
||||
"""Validation check for 'output_dir' variable from main
|
||||
constructor."""
|
||||
valid_existing_dir_path(output_dir, allow_base=True)
|
||||
|
||||
def setup_job(self, event:dict[str,Any], yaml_dict:dict[str,Any])->None:
|
||||
"""Function to set up new job dict and send it to the runner to be
|
||||
executed."""
|
||||
# TODO edit me to write job files to a local store in handler, then
|
||||
# read those files within conductor
|
||||
meow_job = create_job(PYTHON_TYPE, event, {
|
||||
JOB_PARAMETERS:yaml_dict,
|
||||
JOB_HASH: event[WATCHDOG_HASH],
|
||||
@ -115,7 +145,9 @@ class PapermillHandler(BaseHandler):
|
||||
f"{PYTHON_TYPE}.", DEBUG_INFO)
|
||||
self.to_runner.send(meow_job)
|
||||
|
||||
# Papermill job execution code, to be run within the conductor
|
||||
def job_func(job):
|
||||
# Requires own imports as will be run in its own execution environment
|
||||
import os
|
||||
import shutil
|
||||
import papermill
|
||||
@ -131,6 +163,7 @@ def job_func(job):
|
||||
|
||||
event = job[JOB_EVENT]
|
||||
|
||||
# replace MEOW keyworks within variables dict
|
||||
yaml_dict = replace_keywords(
|
||||
job[JOB_PARAMETERS],
|
||||
job[JOB_ID],
|
||||
@ -138,15 +171,19 @@ def job_func(job):
|
||||
event[WATCHDOG_BASE]
|
||||
)
|
||||
|
||||
# Create a base job directory
|
||||
job_dir = os.path.join(job[PYTHON_EXECUTION_BASE], job[JOB_ID])
|
||||
make_dir(job_dir)
|
||||
|
||||
# write a status file to the job directory
|
||||
meta_file = os.path.join(job_dir, META_FILE)
|
||||
write_yaml(job, meta_file)
|
||||
|
||||
# write an executable notebook to the job directory
|
||||
base_file = os.path.join(job_dir, BASE_FILE)
|
||||
write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file)
|
||||
|
||||
# write a parameter file to the job directory
|
||||
param_file = os.path.join(job_dir, PARAMS_FILE)
|
||||
write_yaml(yaml_dict, param_file)
|
||||
|
||||
@ -156,10 +193,17 @@ def job_func(job):
|
||||
job[JOB_STATUS] = STATUS_RUNNING
|
||||
job[JOB_START_TIME] = datetime.now()
|
||||
|
||||
# update the status file with running status
|
||||
write_yaml(job, meta_file)
|
||||
|
||||
# Check the hash of the triggering file, if present. This addresses
|
||||
# potential race condition as file could have been modified since
|
||||
# triggering event
|
||||
if JOB_HASH in job:
|
||||
# get current hash
|
||||
triggerfile_hash = get_file_hash(job[JOB_EVENT][EVENT_PATH], SHA256)
|
||||
# If hash doesn't match, then abort the job. If its been modified, then
|
||||
# another job will have been scheduled anyway.
|
||||
if not triggerfile_hash \
|
||||
or triggerfile_hash != job[JOB_HASH]:
|
||||
job[JOB_STATUS] = STATUS_SKIPPED
|
||||
@ -172,6 +216,7 @@ def job_func(job):
|
||||
write_yaml(job, meta_file)
|
||||
return
|
||||
|
||||
# Create a parameterised version of the executable notebook
|
||||
try:
|
||||
job_notebook = parameterize_jupyter_notebook(
|
||||
event[WATCHDOG_RULE].recipe.recipe, yaml_dict
|
||||
@ -185,6 +230,7 @@ def job_func(job):
|
||||
write_yaml(job, meta_file)
|
||||
return
|
||||
|
||||
# Execute the parameterised notebook
|
||||
try:
|
||||
papermill.execute_notebook(job_file, result_file, {})
|
||||
except Exception as e:
|
||||
@ -195,10 +241,12 @@ def job_func(job):
|
||||
write_yaml(job, meta_file)
|
||||
return
|
||||
|
||||
# Update the status file with the finalised status
|
||||
job[JOB_STATUS] = STATUS_DONE
|
||||
job[JOB_END_TIME] = datetime.now()
|
||||
write_yaml(job, meta_file)
|
||||
|
||||
# Move the contents of the execution directory to the final output
|
||||
# directory.
|
||||
job_output_dir = os.path.join(job[PYTHON_OUTPUT_DIR], job[JOB_ID])
|
||||
|
||||
shutil.move(job_dir, job_output_dir)
|
||||
|
Reference in New Issue
Block a user