updated job creation to be written to files, which are then read and executed

This commit is contained in:
PatchOfScotland
2023-01-31 18:03:11 +01:00
parent bb7c4af1e5
commit a2abf91e7e
4 changed files with 109 additions and 55 deletions

View File

@ -5,6 +5,7 @@ along with an appropriate handler for said events.
Author(s): David Marchant Author(s): David Marchant
""" """
import os
import itertools import itertools
import nbformat import nbformat
import sys import sys
@ -16,8 +17,10 @@ from core.correctness.validation import check_type, valid_string, \
from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, PYTHON_FUNC, \ from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, PYTHON_FUNC, \
DEBUG_INFO, WATCHDOG_TYPE, JOB_HASH, PYTHON_EXECUTION_BASE, \ DEBUG_INFO, WATCHDOG_TYPE, JOB_HASH, PYTHON_EXECUTION_BASE, \
WATCHDOG_RULE, EVENT_PATH, PYTHON_TYPE, WATCHDOG_HASH, JOB_PARAMETERS, \ WATCHDOG_RULE, EVENT_PATH, PYTHON_TYPE, WATCHDOG_HASH, JOB_PARAMETERS, \
PYTHON_OUTPUT_DIR PYTHON_OUTPUT_DIR, JOB_ID, WATCHDOG_BASE, META_FILE, BASE_FILE, \
from core.functionality import print_debug, create_job, replace_keywords PARAMS_FILE, JOB_FILE, RESULT_FILE, JOB_STATUS, STATUS_QUEUED
from core.functionality import print_debug, create_job, replace_keywords, \
make_dir, write_yaml, write_notebook
from core.meow import BaseRecipe, BaseHandler from core.meow import BaseRecipe, BaseHandler
from patterns.file_event_pattern import SWEEP_START, SWEEP_STOP, SWEEP_JUMP from patterns.file_event_pattern import SWEEP_START, SWEEP_STOP, SWEEP_JUMP
@ -59,10 +62,9 @@ class JupyterNotebookRecipe(BaseRecipe):
valid_string(k, VALID_VARIABLE_NAME_CHARS) valid_string(k, VALID_VARIABLE_NAME_CHARS)
class PapermillHandler(BaseHandler): class PapermillHandler(BaseHandler):
# TODO move me to conductor # handler directory to setup jobs in
# Execution directory
handler_base:str handler_base:str
# TODO possibly move me also to conductor? # TODO move me to conductor?
# Final location for job output to be placed # Final location for job output to be placed
output_dir:str output_dir:str
# Config option, above which debug messages are ignored # Config option, above which debug messages are ignored
@ -138,8 +140,7 @@ class PapermillHandler(BaseHandler):
def setup_job(self, event:dict[str,Any], yaml_dict:dict[str,Any])->None: def setup_job(self, event:dict[str,Any], yaml_dict:dict[str,Any])->None:
"""Function to set up new job dict and send it to the runner to be """Function to set up new job dict and send it to the runner to be
executed.""" executed."""
# TODO edit me to write job files to a local store in handler, then # TODO finish me so execution completed in conductor
# read those files within conductor
meow_job = create_job(PYTHON_TYPE, event, { meow_job = create_job(PYTHON_TYPE, event, {
JOB_PARAMETERS:yaml_dict, JOB_PARAMETERS:yaml_dict,
JOB_HASH: event[WATCHDOG_HASH], JOB_HASH: event[WATCHDOG_HASH],
@ -149,6 +150,37 @@ class PapermillHandler(BaseHandler):
print_debug(self._print_target, self.debug_level, print_debug(self._print_target, self.debug_level,
f"Creating job from event at {event[EVENT_PATH]} of type " f"Creating job from event at {event[EVENT_PATH]} of type "
f"{PYTHON_TYPE}.", DEBUG_INFO) f"{PYTHON_TYPE}.", DEBUG_INFO)
# replace MEOW keyworks within variables dict
yaml_dict = replace_keywords(
meow_job[JOB_PARAMETERS],
meow_job[JOB_ID],
event[EVENT_PATH],
event[WATCHDOG_BASE]
)
# Create a base job directory
job_dir = os.path.join(
meow_job[PYTHON_EXECUTION_BASE], meow_job[JOB_ID])
make_dir(job_dir)
# write a status file to the job directory
meta_file = os.path.join(job_dir, META_FILE)
write_yaml(meow_job, meta_file)
# write an executable notebook to the job directory
base_file = os.path.join(job_dir, BASE_FILE)
write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file)
# write a parameter file to the job directory
param_file = os.path.join(job_dir, PARAMS_FILE)
write_yaml(yaml_dict, param_file)
meow_job[JOB_STATUS] = STATUS_QUEUED
# update the status file with queued status
write_yaml(meow_job, meta_file)
self.to_runner.send(meow_job) self.to_runner.send(meow_job)
# Papermill job execution code, to be run within the conductor # Papermill job execution code, to be run within the conductor
@ -158,43 +190,23 @@ def job_func(job):
import shutil import shutil
import papermill import papermill
from datetime import datetime from datetime import datetime
from core.functionality import make_dir, write_yaml, \ from core.functionality import make_dir, write_yaml, read_yaml, \
write_notebook, get_file_hash, parameterize_jupyter_notebook write_notebook, get_file_hash, parameterize_jupyter_notebook
from core.correctness.vars import JOB_EVENT, WATCHDOG_RULE, \ from core.correctness.vars import JOB_EVENT, WATCHDOG_RULE, \
JOB_ID, EVENT_PATH, WATCHDOG_BASE, META_FILE, \ JOB_ID, EVENT_PATH, WATCHDOG_BASE, META_FILE, \
BASE_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE, JOB_STATUS, \ BASE_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE, JOB_STATUS, \
JOB_START_TIME, STATUS_RUNNING, JOB_HASH, SHA256, \ JOB_START_TIME, STATUS_RUNNING, JOB_HASH, SHA256, \
STATUS_SKIPPED, STATUS_DONE, JOB_END_TIME, \ STATUS_SKIPPED, STATUS_DONE, JOB_END_TIME, STATUS_QUEUED, \
JOB_ERROR, STATUS_FAILED, PYTHON_EXECUTION_BASE, PYTHON_OUTPUT_DIR JOB_ERROR, STATUS_FAILED, PYTHON_EXECUTION_BASE, PYTHON_OUTPUT_DIR
event = job[JOB_EVENT] event = job[JOB_EVENT]
# replace MEOW keyworks within variables dict # Identify job files
yaml_dict = replace_keywords(
job[JOB_PARAMETERS],
job[JOB_ID],
event[EVENT_PATH],
event[WATCHDOG_BASE]
)
# Create a base job directory
job_dir = os.path.join(job[PYTHON_EXECUTION_BASE], job[JOB_ID]) job_dir = os.path.join(job[PYTHON_EXECUTION_BASE], job[JOB_ID])
make_dir(job_dir)
# write a status file to the job directory
meta_file = os.path.join(job_dir, META_FILE) meta_file = os.path.join(job_dir, META_FILE)
write_yaml(job, meta_file)
# write an executable notebook to the job directory
base_file = os.path.join(job_dir, BASE_FILE)
write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file)
# write a parameter file to the job directory
param_file = os.path.join(job_dir, PARAMS_FILE)
write_yaml(yaml_dict, param_file)
job_file = os.path.join(job_dir, JOB_FILE) job_file = os.path.join(job_dir, JOB_FILE)
result_file = os.path.join(job_dir, RESULT_FILE) result_file = os.path.join(job_dir, RESULT_FILE)
param_file = os.path.join(job_dir, PARAMS_FILE)
job[JOB_STATUS] = STATUS_RUNNING job[JOB_STATUS] = STATUS_RUNNING
job[JOB_START_TIME] = datetime.now() job[JOB_START_TIME] = datetime.now()
@ -202,6 +214,8 @@ def job_func(job):
# update the status file with running status # update the status file with running status
write_yaml(job, meta_file) write_yaml(job, meta_file)
yaml_dict = read_yaml(param_file)
# Check the hash of the triggering file, if present. This addresses # Check the hash of the triggering file, if present. This addresses
# potential race condition as file could have been modified since # potential race condition as file could have been modified since
# triggering event # triggering event

View File

@ -6,7 +6,8 @@ from core.correctness.vars import PYTHON_TYPE, SHA256, WATCHDOG_TYPE, \
WATCHDOG_BASE, WATCHDOG_RULE, WATCHDOG_HASH, JOB_PARAMETERS, JOB_HASH, \ WATCHDOG_BASE, WATCHDOG_RULE, WATCHDOG_HASH, JOB_PARAMETERS, JOB_HASH, \
PYTHON_FUNC, PYTHON_OUTPUT_DIR, PYTHON_EXECUTION_BASE, JOB_ID, META_FILE, \ PYTHON_FUNC, PYTHON_OUTPUT_DIR, PYTHON_EXECUTION_BASE, JOB_ID, META_FILE, \
BASE_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE BASE_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE
from core.functionality import get_file_hash, create_event, create_job from core.functionality import get_file_hash, create_event, create_job, \
make_dir, write_yaml, write_notebook
from core.meow import create_rule from core.meow import create_rule
from conductors import LocalPythonConductor from conductors import LocalPythonConductor
from patterns import FileEventPattern from patterns import FileEventPattern
@ -62,6 +63,12 @@ class MeowTests(unittest.TestCase):
rule = create_rule(pattern, recipe) rule = create_rule(pattern, recipe)
params_dict = {
"extra":"extra",
"infile":file_path,
"outfile":result_path
}
job_dict = create_job( job_dict = create_job(
PYTHON_TYPE, PYTHON_TYPE,
create_event( create_event(
@ -74,11 +81,7 @@ class MeowTests(unittest.TestCase):
} }
), ),
{ {
JOB_PARAMETERS:{ JOB_PARAMETERS:params_dict,
"extra":"extra",
"infile":file_path,
"outfile":result_path
},
JOB_HASH: file_hash, JOB_HASH: file_hash,
PYTHON_FUNC:job_func, PYTHON_FUNC:job_func,
PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT, PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT,
@ -86,6 +89,15 @@ class MeowTests(unittest.TestCase):
} }
) )
job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID])
make_dir(job_dir)
param_file = os.path.join(job_dir, PARAMS_FILE)
write_yaml(params_dict, param_file)
base_file = os.path.join(job_dir, BASE_FILE)
write_notebook(APPENDING_NOTEBOOK, base_file)
lpc.execute(job_dict) lpc.execute(job_dict)
job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID]) job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID])
@ -127,6 +139,12 @@ class MeowTests(unittest.TestCase):
rule = create_rule(pattern, recipe) rule = create_rule(pattern, recipe)
params_dict = {
"extra":"extra",
"infile":file_path,
"outfile":result_path
}
bad_job_dict = create_job( bad_job_dict = create_job(
PYTHON_TYPE, PYTHON_TYPE,
create_event( create_event(
@ -139,16 +157,21 @@ class MeowTests(unittest.TestCase):
} }
), ),
{ {
JOB_PARAMETERS:{ JOB_PARAMETERS:params_dict,
"extra":"extra",
"infile":file_path,
"outfile":result_path
},
JOB_HASH: file_hash, JOB_HASH: file_hash,
PYTHON_FUNC:job_func, PYTHON_FUNC:job_func,
} }
) )
job_dir = os.path.join(TEST_HANDLER_BASE, bad_job_dict[JOB_ID])
make_dir(job_dir)
param_file = os.path.join(job_dir, PARAMS_FILE)
write_yaml(params_dict, param_file)
base_file = os.path.join(job_dir, BASE_FILE)
write_notebook(APPENDING_NOTEBOOK, base_file)
with self.assertRaises(KeyError): with self.assertRaises(KeyError):
lpc.execute(bad_job_dict) lpc.execute(bad_job_dict)
@ -165,11 +188,7 @@ class MeowTests(unittest.TestCase):
} }
), ),
{ {
JOB_PARAMETERS:{ JOB_PARAMETERS:params_dict,
"extra":"extra",
"infile":file_path,
"outfile":result_path
},
JOB_HASH: file_hash, JOB_HASH: file_hash,
PYTHON_FUNC:job_func, PYTHON_FUNC:job_func,
PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT, PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT,
@ -177,6 +196,15 @@ class MeowTests(unittest.TestCase):
} }
) )
job_dir = os.path.join(TEST_HANDLER_BASE, good_job_dict[JOB_ID])
make_dir(job_dir)
param_file = os.path.join(job_dir, PARAMS_FILE)
write_yaml(params_dict, param_file)
base_file = os.path.join(job_dir, BASE_FILE)
write_notebook(APPENDING_NOTEBOOK, base_file)
lpc.execute(good_job_dict) lpc.execute(good_job_dict)
job_dir = os.path.join(TEST_HANDLER_BASE, good_job_dict[JOB_ID]) job_dir = os.path.join(TEST_HANDLER_BASE, good_job_dict[JOB_ID])

View File

@ -11,7 +11,8 @@ from core.correctness.vars import EVENT_TYPE, WATCHDOG_BASE, WATCHDOG_RULE, \
PYTHON_EXECUTION_BASE, META_FILE, BASE_FILE, PARAMS_FILE, JOB_FILE, \ PYTHON_EXECUTION_BASE, META_FILE, BASE_FILE, PARAMS_FILE, JOB_FILE, \
RESULT_FILE RESULT_FILE
from core.correctness.validation import valid_job from core.correctness.validation import valid_job
from core.functionality import get_file_hash, create_job, create_event from core.functionality import get_file_hash, create_job, create_event, \
make_dir, write_yaml, write_notebook
from core.meow import create_rules, create_rule from core.meow import create_rules, create_rule
from patterns.file_event_pattern import FileEventPattern, SWEEP_START, \ from patterns.file_event_pattern import FileEventPattern, SWEEP_START, \
SWEEP_STOP, SWEEP_JUMP SWEEP_STOP, SWEEP_JUMP
@ -332,6 +333,12 @@ class CorrectnessTests(unittest.TestCase):
rule = create_rule(pattern, recipe) rule = create_rule(pattern, recipe)
params_dict = {
"extra":"extra",
"infile":file_path,
"outfile":result_path
}
job_dict = create_job( job_dict = create_job(
PYTHON_TYPE, PYTHON_TYPE,
create_event( create_event(
@ -344,11 +351,7 @@ class CorrectnessTests(unittest.TestCase):
} }
), ),
{ {
JOB_PARAMETERS:{ JOB_PARAMETERS:params_dict,
"extra":"extra",
"infile":file_path,
"outfile":result_path
},
JOB_HASH: file_hash, JOB_HASH: file_hash,
PYTHON_FUNC:job_func, PYTHON_FUNC:job_func,
PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT, PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT,
@ -356,6 +359,16 @@ class CorrectnessTests(unittest.TestCase):
} }
) )
job_dir = os.path.join(
job_dict[PYTHON_EXECUTION_BASE], job_dict[JOB_ID])
make_dir(job_dir)
param_file = os.path.join(job_dir, PARAMS_FILE)
write_yaml(params_dict, param_file)
base_file = os.path.join(job_dir, BASE_FILE)
write_notebook(APPENDING_NOTEBOOK, base_file)
job_func(job_dict) job_func(job_dict)
job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID]) job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID])

View File

@ -10,8 +10,7 @@ from core.correctness.vars import RESULT_FILE
from core.functionality import make_dir, read_notebook from core.functionality import make_dir, read_notebook
from core.meow import BaseMonitor, BaseHandler, BaseConductor from core.meow import BaseMonitor, BaseHandler, BaseConductor
from core.runner import MeowRunner from core.runner import MeowRunner
from patterns.file_event_pattern import WatchdogMonitor, FileEventPattern, \ from patterns.file_event_pattern import WatchdogMonitor, FileEventPattern
SWEEP_JUMP, SWEEP_START, SWEEP_STOP
from recipes.jupyter_notebook_recipe import PapermillHandler, \ from recipes.jupyter_notebook_recipe import PapermillHandler, \
JupyterNotebookRecipe JupyterNotebookRecipe
from shared import setup, teardown, TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \ from shared import setup, teardown, TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \