diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index 6c872b5..06eafc2 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -5,6 +5,7 @@ along with an appropriate handler for said events. Author(s): David Marchant """ +import os import itertools import nbformat import sys @@ -16,8 +17,10 @@ from core.correctness.validation import check_type, valid_string, \ from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, PYTHON_FUNC, \ DEBUG_INFO, WATCHDOG_TYPE, JOB_HASH, PYTHON_EXECUTION_BASE, \ WATCHDOG_RULE, EVENT_PATH, PYTHON_TYPE, WATCHDOG_HASH, JOB_PARAMETERS, \ - PYTHON_OUTPUT_DIR -from core.functionality import print_debug, create_job, replace_keywords + PYTHON_OUTPUT_DIR, JOB_ID, WATCHDOG_BASE, META_FILE, BASE_FILE, \ + PARAMS_FILE, JOB_FILE, RESULT_FILE, JOB_STATUS, STATUS_QUEUED +from core.functionality import print_debug, create_job, replace_keywords, \ + make_dir, write_yaml, write_notebook from core.meow import BaseRecipe, BaseHandler from patterns.file_event_pattern import SWEEP_START, SWEEP_STOP, SWEEP_JUMP @@ -59,10 +62,9 @@ class JupyterNotebookRecipe(BaseRecipe): valid_string(k, VALID_VARIABLE_NAME_CHARS) class PapermillHandler(BaseHandler): - # TODO move me to conductor - # Execution directory + # handler directory to setup jobs in handler_base:str - # TODO possibly move me also to conductor? + # TODO move me to conductor? # Final location for job output to be placed output_dir:str # Config option, above which debug messages are ignored @@ -138,8 +140,7 @@ class PapermillHandler(BaseHandler): def setup_job(self, event:dict[str,Any], yaml_dict:dict[str,Any])->None: """Function to set up new job dict and send it to the runner to be executed.""" - # TODO edit me to write job files to a local store in handler, then - # read those files within conductor + # TODO finish me so execution completed in conductor meow_job = create_job(PYTHON_TYPE, event, { JOB_PARAMETERS:yaml_dict, JOB_HASH: event[WATCHDOG_HASH], @@ -149,6 +150,37 @@ class PapermillHandler(BaseHandler): print_debug(self._print_target, self.debug_level, f"Creating job from event at {event[EVENT_PATH]} of type " f"{PYTHON_TYPE}.", DEBUG_INFO) + + # replace MEOW keyworks within variables dict + yaml_dict = replace_keywords( + meow_job[JOB_PARAMETERS], + meow_job[JOB_ID], + event[EVENT_PATH], + event[WATCHDOG_BASE] + ) + + # Create a base job directory + job_dir = os.path.join( + meow_job[PYTHON_EXECUTION_BASE], meow_job[JOB_ID]) + make_dir(job_dir) + + # write a status file to the job directory + meta_file = os.path.join(job_dir, META_FILE) + write_yaml(meow_job, meta_file) + + # write an executable notebook to the job directory + base_file = os.path.join(job_dir, BASE_FILE) + write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file) + + # write a parameter file to the job directory + param_file = os.path.join(job_dir, PARAMS_FILE) + write_yaml(yaml_dict, param_file) + + meow_job[JOB_STATUS] = STATUS_QUEUED + + # update the status file with queued status + write_yaml(meow_job, meta_file) + self.to_runner.send(meow_job) # Papermill job execution code, to be run within the conductor @@ -158,43 +190,23 @@ def job_func(job): import shutil import papermill from datetime import datetime - from core.functionality import make_dir, write_yaml, \ + from core.functionality import make_dir, write_yaml, read_yaml, \ write_notebook, get_file_hash, parameterize_jupyter_notebook from core.correctness.vars import JOB_EVENT, WATCHDOG_RULE, \ JOB_ID, EVENT_PATH, WATCHDOG_BASE, META_FILE, \ BASE_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE, JOB_STATUS, \ JOB_START_TIME, STATUS_RUNNING, JOB_HASH, SHA256, \ - STATUS_SKIPPED, STATUS_DONE, JOB_END_TIME, \ + STATUS_SKIPPED, STATUS_DONE, JOB_END_TIME, STATUS_QUEUED, \ JOB_ERROR, STATUS_FAILED, PYTHON_EXECUTION_BASE, PYTHON_OUTPUT_DIR event = job[JOB_EVENT] - # replace MEOW keyworks within variables dict - yaml_dict = replace_keywords( - job[JOB_PARAMETERS], - job[JOB_ID], - event[EVENT_PATH], - event[WATCHDOG_BASE] - ) - - # Create a base job directory + # Identify job files job_dir = os.path.join(job[PYTHON_EXECUTION_BASE], job[JOB_ID]) - make_dir(job_dir) - - # write a status file to the job directory meta_file = os.path.join(job_dir, META_FILE) - write_yaml(job, meta_file) - - # write an executable notebook to the job directory - base_file = os.path.join(job_dir, BASE_FILE) - write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file) - - # write a parameter file to the job directory - param_file = os.path.join(job_dir, PARAMS_FILE) - write_yaml(yaml_dict, param_file) - job_file = os.path.join(job_dir, JOB_FILE) result_file = os.path.join(job_dir, RESULT_FILE) + param_file = os.path.join(job_dir, PARAMS_FILE) job[JOB_STATUS] = STATUS_RUNNING job[JOB_START_TIME] = datetime.now() @@ -202,6 +214,8 @@ def job_func(job): # update the status file with running status write_yaml(job, meta_file) + yaml_dict = read_yaml(param_file) + # Check the hash of the triggering file, if present. This addresses # potential race condition as file could have been modified since # triggering event diff --git a/tests/test_conductors.py b/tests/test_conductors.py index ddbd498..c0f1132 100644 --- a/tests/test_conductors.py +++ b/tests/test_conductors.py @@ -6,7 +6,8 @@ from core.correctness.vars import PYTHON_TYPE, SHA256, WATCHDOG_TYPE, \ WATCHDOG_BASE, WATCHDOG_RULE, WATCHDOG_HASH, JOB_PARAMETERS, JOB_HASH, \ PYTHON_FUNC, PYTHON_OUTPUT_DIR, PYTHON_EXECUTION_BASE, JOB_ID, META_FILE, \ BASE_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE -from core.functionality import get_file_hash, create_event, create_job +from core.functionality import get_file_hash, create_event, create_job, \ + make_dir, write_yaml, write_notebook from core.meow import create_rule from conductors import LocalPythonConductor from patterns import FileEventPattern @@ -62,6 +63,12 @@ class MeowTests(unittest.TestCase): rule = create_rule(pattern, recipe) + params_dict = { + "extra":"extra", + "infile":file_path, + "outfile":result_path + } + job_dict = create_job( PYTHON_TYPE, create_event( @@ -74,11 +81,7 @@ class MeowTests(unittest.TestCase): } ), { - JOB_PARAMETERS:{ - "extra":"extra", - "infile":file_path, - "outfile":result_path - }, + JOB_PARAMETERS:params_dict, JOB_HASH: file_hash, PYTHON_FUNC:job_func, PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT, @@ -86,6 +89,15 @@ class MeowTests(unittest.TestCase): } ) + job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID]) + make_dir(job_dir) + + param_file = os.path.join(job_dir, PARAMS_FILE) + write_yaml(params_dict, param_file) + + base_file = os.path.join(job_dir, BASE_FILE) + write_notebook(APPENDING_NOTEBOOK, base_file) + lpc.execute(job_dict) job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID]) @@ -127,6 +139,12 @@ class MeowTests(unittest.TestCase): rule = create_rule(pattern, recipe) + params_dict = { + "extra":"extra", + "infile":file_path, + "outfile":result_path + } + bad_job_dict = create_job( PYTHON_TYPE, create_event( @@ -139,16 +157,21 @@ class MeowTests(unittest.TestCase): } ), { - JOB_PARAMETERS:{ - "extra":"extra", - "infile":file_path, - "outfile":result_path - }, + JOB_PARAMETERS:params_dict, JOB_HASH: file_hash, PYTHON_FUNC:job_func, } ) + job_dir = os.path.join(TEST_HANDLER_BASE, bad_job_dict[JOB_ID]) + make_dir(job_dir) + + param_file = os.path.join(job_dir, PARAMS_FILE) + write_yaml(params_dict, param_file) + + base_file = os.path.join(job_dir, BASE_FILE) + write_notebook(APPENDING_NOTEBOOK, base_file) + with self.assertRaises(KeyError): lpc.execute(bad_job_dict) @@ -165,11 +188,7 @@ class MeowTests(unittest.TestCase): } ), { - JOB_PARAMETERS:{ - "extra":"extra", - "infile":file_path, - "outfile":result_path - }, + JOB_PARAMETERS:params_dict, JOB_HASH: file_hash, PYTHON_FUNC:job_func, PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT, @@ -177,6 +196,15 @@ class MeowTests(unittest.TestCase): } ) + job_dir = os.path.join(TEST_HANDLER_BASE, good_job_dict[JOB_ID]) + make_dir(job_dir) + + param_file = os.path.join(job_dir, PARAMS_FILE) + write_yaml(params_dict, param_file) + + base_file = os.path.join(job_dir, BASE_FILE) + write_notebook(APPENDING_NOTEBOOK, base_file) + lpc.execute(good_job_dict) job_dir = os.path.join(TEST_HANDLER_BASE, good_job_dict[JOB_ID]) diff --git a/tests/test_recipes.py b/tests/test_recipes.py index bcc8ac6..13cc354 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -11,7 +11,8 @@ from core.correctness.vars import EVENT_TYPE, WATCHDOG_BASE, WATCHDOG_RULE, \ PYTHON_EXECUTION_BASE, META_FILE, BASE_FILE, PARAMS_FILE, JOB_FILE, \ RESULT_FILE from core.correctness.validation import valid_job -from core.functionality import get_file_hash, create_job, create_event +from core.functionality import get_file_hash, create_job, create_event, \ + make_dir, write_yaml, write_notebook from core.meow import create_rules, create_rule from patterns.file_event_pattern import FileEventPattern, SWEEP_START, \ SWEEP_STOP, SWEEP_JUMP @@ -332,6 +333,12 @@ class CorrectnessTests(unittest.TestCase): rule = create_rule(pattern, recipe) + params_dict = { + "extra":"extra", + "infile":file_path, + "outfile":result_path + } + job_dict = create_job( PYTHON_TYPE, create_event( @@ -344,11 +351,7 @@ class CorrectnessTests(unittest.TestCase): } ), { - JOB_PARAMETERS:{ - "extra":"extra", - "infile":file_path, - "outfile":result_path - }, + JOB_PARAMETERS:params_dict, JOB_HASH: file_hash, PYTHON_FUNC:job_func, PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT, @@ -356,6 +359,16 @@ class CorrectnessTests(unittest.TestCase): } ) + job_dir = os.path.join( + job_dict[PYTHON_EXECUTION_BASE], job_dict[JOB_ID]) + make_dir(job_dir) + + param_file = os.path.join(job_dir, PARAMS_FILE) + write_yaml(params_dict, param_file) + + base_file = os.path.join(job_dir, BASE_FILE) + write_notebook(APPENDING_NOTEBOOK, base_file) + job_func(job_dict) job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID]) diff --git a/tests/test_runner.py b/tests/test_runner.py index 055dcc6..d3f0025 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -10,8 +10,7 @@ from core.correctness.vars import RESULT_FILE from core.functionality import make_dir, read_notebook from core.meow import BaseMonitor, BaseHandler, BaseConductor from core.runner import MeowRunner -from patterns.file_event_pattern import WatchdogMonitor, FileEventPattern, \ - SWEEP_JUMP, SWEEP_START, SWEEP_STOP +from patterns.file_event_pattern import WatchdogMonitor, FileEventPattern from recipes.jupyter_notebook_recipe import PapermillHandler, \ JupyterNotebookRecipe from shared import setup, teardown, TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \