added some more tests for job execution and ensured an error file is always produced but failed jobs
This commit is contained in:
@ -12,11 +12,11 @@ from datetime import datetime
|
|||||||
from typing import Any, Tuple, Dict
|
from typing import Any, Tuple, Dict
|
||||||
|
|
||||||
from core.correctness.vars import JOB_TYPE_PYTHON, PYTHON_FUNC, JOB_STATUS, \
|
from core.correctness.vars import JOB_TYPE_PYTHON, PYTHON_FUNC, JOB_STATUS, \
|
||||||
STATUS_RUNNING, JOB_START_TIME, JOB_ID, META_FILE, \
|
STATUS_RUNNING, JOB_START_TIME, META_FILE, BACKUP_JOB_ERROR_FILE, \
|
||||||
STATUS_DONE, JOB_END_TIME, STATUS_FAILED, JOB_ERROR, \
|
STATUS_DONE, JOB_END_TIME, STATUS_FAILED, JOB_ERROR, \
|
||||||
JOB_TYPE, JOB_TYPE_PAPERMILL, DEFAULT_JOB_QUEUE_DIR, DEFAULT_JOB_OUTPUT_DIR
|
JOB_TYPE, JOB_TYPE_PAPERMILL, DEFAULT_JOB_QUEUE_DIR, DEFAULT_JOB_OUTPUT_DIR
|
||||||
from core.correctness.validation import valid_job, valid_dir_path
|
from core.correctness.validation import valid_job, valid_dir_path
|
||||||
from core.functionality import read_yaml, write_yaml, make_dir
|
from core.functionality import read_yaml, write_yaml, make_dir, write_file
|
||||||
from core.meow import BaseConductor
|
from core.meow import BaseConductor
|
||||||
|
|
||||||
|
|
||||||
@ -52,41 +52,53 @@ class LocalPythonConductor(BaseConductor):
|
|||||||
more detailed feedback."""
|
more detailed feedback."""
|
||||||
valid_dir_path(job_dir, must_exist=True)
|
valid_dir_path(job_dir, must_exist=True)
|
||||||
|
|
||||||
meta_file = os.path.join(job_dir, META_FILE)
|
# Test our job parameters. Even if its gibberish, we still move to
|
||||||
job = read_yaml(meta_file)
|
# output
|
||||||
valid_job(job)
|
abort = False
|
||||||
|
|
||||||
# update the status file with running status
|
|
||||||
job[JOB_STATUS] = STATUS_RUNNING
|
|
||||||
job[JOB_START_TIME] = datetime.now()
|
|
||||||
write_yaml(job, meta_file)
|
|
||||||
|
|
||||||
# execute the job
|
|
||||||
try:
|
try:
|
||||||
job_function = job[PYTHON_FUNC]
|
meta_file = os.path.join(job_dir, META_FILE)
|
||||||
job_function(job_dir)
|
|
||||||
|
|
||||||
# get up to date job data
|
|
||||||
job = read_yaml(meta_file)
|
job = read_yaml(meta_file)
|
||||||
|
valid_job(job)
|
||||||
|
|
||||||
# Update the status file with the finalised status
|
# update the status file with running status
|
||||||
job[JOB_STATUS] = STATUS_DONE
|
job[JOB_STATUS] = STATUS_RUNNING
|
||||||
job[JOB_END_TIME] = datetime.now()
|
job[JOB_START_TIME] = datetime.now()
|
||||||
write_yaml(job, meta_file)
|
write_yaml(job, meta_file)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# get up to date job data
|
# If something has gone wrong at this stage then its bad, so we
|
||||||
job = read_yaml(meta_file)
|
# need to make our own error file
|
||||||
|
error_file = os.path.join(job_dir, BACKUP_JOB_ERROR_FILE)
|
||||||
|
write_file(f"Recieved incorrectly setup job.\n\n{e}", error_file)
|
||||||
|
abort = True
|
||||||
|
|
||||||
# Update the status file with the error status. Don't overwrite any
|
# execute the job
|
||||||
# more specific error messages already created
|
if not abort:
|
||||||
if JOB_STATUS not in job:
|
try:
|
||||||
job[JOB_STATUS] = STATUS_FAILED
|
job_function = job[PYTHON_FUNC]
|
||||||
if JOB_END_TIME not in job:
|
job_function(job_dir)
|
||||||
|
|
||||||
|
# get up to date job data
|
||||||
|
job = read_yaml(meta_file)
|
||||||
|
|
||||||
|
# Update the status file with the finalised status
|
||||||
|
job[JOB_STATUS] = STATUS_DONE
|
||||||
job[JOB_END_TIME] = datetime.now()
|
job[JOB_END_TIME] = datetime.now()
|
||||||
if JOB_ERROR not in job:
|
write_yaml(job, meta_file)
|
||||||
job[JOB_ERROR] = f"Job execution failed. {e}"
|
|
||||||
write_yaml(job, meta_file)
|
except Exception as e:
|
||||||
|
# get up to date job data
|
||||||
|
job = read_yaml(meta_file)
|
||||||
|
|
||||||
|
# Update the status file with the error status. Don't overwrite
|
||||||
|
# any more specific error messages already created
|
||||||
|
if JOB_STATUS not in job:
|
||||||
|
job[JOB_STATUS] = STATUS_FAILED
|
||||||
|
if JOB_END_TIME not in job:
|
||||||
|
job[JOB_END_TIME] = datetime.now()
|
||||||
|
if JOB_ERROR not in job:
|
||||||
|
job[JOB_ERROR] = f"Job execution failed. {e}"
|
||||||
|
write_yaml(job, meta_file)
|
||||||
|
|
||||||
# Move the contents of the execution directory to the final output
|
# Move the contents of the execution directory to the final output
|
||||||
# directory.
|
# directory.
|
||||||
|
@ -86,7 +86,7 @@ JOB_TYPE = "job_type"
|
|||||||
JOB_TYPE_PYTHON = "python"
|
JOB_TYPE_PYTHON = "python"
|
||||||
JOB_TYPE_PAPERMILL = "papermill"
|
JOB_TYPE_PAPERMILL = "papermill"
|
||||||
PYTHON_FUNC = "func"
|
PYTHON_FUNC = "func"
|
||||||
|
BACKUP_JOB_ERROR_FILE = "ERROR.log"
|
||||||
JOB_TYPES = {
|
JOB_TYPES = {
|
||||||
JOB_TYPE_PAPERMILL: [
|
JOB_TYPE_PAPERMILL: [
|
||||||
"base.ipynb",
|
"base.ipynb",
|
||||||
|
@ -25,7 +25,7 @@ from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \
|
|||||||
EVENT_TYPE, EVENT_PATH, JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, \
|
EVENT_TYPE, EVENT_PATH, JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, \
|
||||||
JOB_RECIPE, JOB_RULE, EVENT_RULE, JOB_STATUS, STATUS_QUEUED, \
|
JOB_RECIPE, JOB_RULE, EVENT_RULE, JOB_STATUS, STATUS_QUEUED, \
|
||||||
JOB_CREATE_TIME, JOB_REQUIREMENTS, WATCHDOG_BASE, WATCHDOG_HASH, \
|
JOB_CREATE_TIME, JOB_REQUIREMENTS, WATCHDOG_BASE, WATCHDOG_HASH, \
|
||||||
EVENT_TYPE_WATCHDOG, JOB_TYPE_PYTHON
|
EVENT_TYPE_WATCHDOG
|
||||||
|
|
||||||
# mig trigger keyword replacements
|
# mig trigger keyword replacements
|
||||||
KEYWORD_PATH = "{PATH}"
|
KEYWORD_PATH = "{PATH}"
|
||||||
|
@ -191,7 +191,6 @@ def papermill_job_func(job_dir):
|
|||||||
|
|
||||||
# Identify job files
|
# Identify job files
|
||||||
meta_file = os.path.join(job_dir, META_FILE)
|
meta_file = os.path.join(job_dir, META_FILE)
|
||||||
# TODO fix these paths so they are dynamic
|
|
||||||
base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL))
|
base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL))
|
||||||
job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_PAPERMILL))
|
job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_PAPERMILL))
|
||||||
result_file = os.path.join(job_dir, get_result_file(JOB_TYPE_PAPERMILL))
|
result_file = os.path.join(job_dir, get_result_file(JOB_TYPE_PAPERMILL))
|
||||||
|
@ -5,13 +5,13 @@ import unittest
|
|||||||
from typing import Dict
|
from typing import Dict
|
||||||
|
|
||||||
from core.correctness.vars import JOB_TYPE_PYTHON, SHA256, JOB_PARAMETERS, \
|
from core.correctness.vars import JOB_TYPE_PYTHON, SHA256, JOB_PARAMETERS, \
|
||||||
JOB_HASH, PYTHON_FUNC, JOB_ID, \
|
JOB_HASH, PYTHON_FUNC, JOB_ID, BACKUP_JOB_ERROR_FILE, \
|
||||||
META_FILE, PARAMS_FILE, JOB_STATUS, JOB_ERROR, \
|
META_FILE, PARAMS_FILE, JOB_STATUS, JOB_ERROR, \
|
||||||
STATUS_DONE, JOB_TYPE_PAPERMILL, get_base_file, get_result_file, \
|
STATUS_DONE, JOB_TYPE_PAPERMILL, get_base_file, get_result_file, \
|
||||||
get_job_file
|
get_job_file
|
||||||
from core.functionality import get_file_hash, create_watchdog_event, \
|
from core.functionality import get_file_hash, create_watchdog_event, \
|
||||||
create_job, make_dir, write_yaml, write_notebook, read_yaml, write_file, \
|
create_job, make_dir, write_yaml, write_notebook, read_yaml, write_file, \
|
||||||
lines_to_string
|
lines_to_string, read_file
|
||||||
from core.meow import create_rule
|
from core.meow import create_rule
|
||||||
from conductors import LocalPythonConductor
|
from conductors import LocalPythonConductor
|
||||||
from patterns import FileEventPattern
|
from patterns import FileEventPattern
|
||||||
@ -391,8 +391,20 @@ class MeowTests(unittest.TestCase):
|
|||||||
job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID])
|
job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID])
|
||||||
make_dir(job_dir)
|
make_dir(job_dir)
|
||||||
|
|
||||||
with self.assertRaises(FileNotFoundError):
|
lpc.execute(job_dir)
|
||||||
lpc.execute(job_dir)
|
|
||||||
|
output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID])
|
||||||
|
|
||||||
|
self.assertFalse(os.path.exists(job_dir))
|
||||||
|
self.assertTrue(os.path.exists(output_dir))
|
||||||
|
|
||||||
|
error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE)
|
||||||
|
self.assertTrue(os.path.exists(error_file))
|
||||||
|
|
||||||
|
error = read_file(error_file)
|
||||||
|
self.assertEqual(error,
|
||||||
|
"Recieved incorrectly setup job.\n\n[Errno 2] No such file or "
|
||||||
|
f"directory: 'test_job_queue_dir/{job_dict[JOB_ID]}/job.yml'")
|
||||||
|
|
||||||
# Test LocalPythonConductor does not execute jobs with bad functions
|
# Test LocalPythonConductor does not execute jobs with bad functions
|
||||||
def testLocalPythonConductorBadFunc(self)->None:
|
def testLocalPythonConductorBadFunc(self)->None:
|
||||||
@ -466,6 +478,150 @@ class MeowTests(unittest.TestCase):
|
|||||||
self.assertIsInstance(job, dict)
|
self.assertIsInstance(job, dict)
|
||||||
self.assertIn(JOB_ERROR, job)
|
self.assertIn(JOB_ERROR, job)
|
||||||
|
|
||||||
|
# Test LocalPythonConductor does not execute jobs with invalid metafile
|
||||||
|
def testLocalPythonConductorInvalidMetafile(self)->None:
|
||||||
|
lpc = LocalPythonConductor(
|
||||||
|
job_queue_dir=TEST_JOB_QUEUE,
|
||||||
|
job_output_dir=TEST_JOB_OUTPUT
|
||||||
|
)
|
||||||
|
|
||||||
|
file_path = os.path.join(TEST_MONITOR_BASE, "test")
|
||||||
|
result_path = os.path.join(TEST_MONITOR_BASE, "output", "test")
|
||||||
|
|
||||||
|
with open(file_path, "w") as f:
|
||||||
|
f.write("Data")
|
||||||
|
|
||||||
|
file_hash = get_file_hash(file_path, SHA256)
|
||||||
|
|
||||||
|
pattern = FileEventPattern(
|
||||||
|
"pattern",
|
||||||
|
file_path,
|
||||||
|
"recipe_one",
|
||||||
|
"infile",
|
||||||
|
parameters={
|
||||||
|
"extra":"A line from a test Pattern",
|
||||||
|
"outfile":result_path
|
||||||
|
})
|
||||||
|
recipe = JupyterNotebookRecipe(
|
||||||
|
"recipe_one", APPENDING_NOTEBOOK)
|
||||||
|
|
||||||
|
rule = create_rule(pattern, recipe)
|
||||||
|
|
||||||
|
job_dict = create_job(
|
||||||
|
JOB_TYPE_PAPERMILL,
|
||||||
|
create_watchdog_event(
|
||||||
|
file_path,
|
||||||
|
rule,
|
||||||
|
TEST_MONITOR_BASE,
|
||||||
|
file_hash
|
||||||
|
),
|
||||||
|
extras={
|
||||||
|
JOB_PARAMETERS:{
|
||||||
|
"extra":"extra",
|
||||||
|
"infile":file_path,
|
||||||
|
"outfile":result_path
|
||||||
|
},
|
||||||
|
JOB_HASH: file_hash,
|
||||||
|
PYTHON_FUNC:failing_func,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID])
|
||||||
|
make_dir(job_dir)
|
||||||
|
|
||||||
|
meta_path = os.path.join(job_dir, META_FILE)
|
||||||
|
write_file("This is not a metafile dict", meta_path)
|
||||||
|
|
||||||
|
lpc.execute(job_dir)
|
||||||
|
|
||||||
|
output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID])
|
||||||
|
|
||||||
|
self.assertFalse(os.path.exists(job_dir))
|
||||||
|
self.assertTrue(os.path.exists(output_dir))
|
||||||
|
|
||||||
|
error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE)
|
||||||
|
self.assertTrue(os.path.exists(error_file))
|
||||||
|
|
||||||
|
error = read_file(error_file)
|
||||||
|
self.assertEqual(error,
|
||||||
|
"Recieved incorrectly setup job.\n\nExpected type(s) are "
|
||||||
|
"'[typing.Dict]', got <class 'str'>")
|
||||||
|
|
||||||
|
# Test LocalPythonConductor does not execute jobs with mangled metafile
|
||||||
|
def testLocalPythonConductorMangledMetafile(self)->None:
|
||||||
|
lpc = LocalPythonConductor(
|
||||||
|
job_queue_dir=TEST_JOB_QUEUE,
|
||||||
|
job_output_dir=TEST_JOB_OUTPUT
|
||||||
|
)
|
||||||
|
|
||||||
|
file_path = os.path.join(TEST_MONITOR_BASE, "test")
|
||||||
|
result_path = os.path.join(TEST_MONITOR_BASE, "output", "test")
|
||||||
|
|
||||||
|
with open(file_path, "w") as f:
|
||||||
|
f.write("Data")
|
||||||
|
|
||||||
|
file_hash = get_file_hash(file_path, SHA256)
|
||||||
|
|
||||||
|
pattern = FileEventPattern(
|
||||||
|
"pattern",
|
||||||
|
file_path,
|
||||||
|
"recipe_one",
|
||||||
|
"infile",
|
||||||
|
parameters={
|
||||||
|
"extra":"A line from a test Pattern",
|
||||||
|
"outfile":result_path
|
||||||
|
})
|
||||||
|
recipe = JupyterNotebookRecipe(
|
||||||
|
"recipe_one", APPENDING_NOTEBOOK)
|
||||||
|
|
||||||
|
rule = create_rule(pattern, recipe)
|
||||||
|
|
||||||
|
job_dict = create_job(
|
||||||
|
JOB_TYPE_PAPERMILL,
|
||||||
|
create_watchdog_event(
|
||||||
|
file_path,
|
||||||
|
rule,
|
||||||
|
TEST_MONITOR_BASE,
|
||||||
|
file_hash
|
||||||
|
),
|
||||||
|
extras={
|
||||||
|
JOB_PARAMETERS:{
|
||||||
|
"extra":"extra",
|
||||||
|
"infile":file_path,
|
||||||
|
"outfile":result_path
|
||||||
|
},
|
||||||
|
JOB_HASH: file_hash,
|
||||||
|
PYTHON_FUNC:failing_func,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID])
|
||||||
|
make_dir(job_dir)
|
||||||
|
|
||||||
|
meta_path = os.path.join(job_dir, META_FILE)
|
||||||
|
write_yaml({
|
||||||
|
"This": "is",
|
||||||
|
"a": "dictionary",
|
||||||
|
"but": "not",
|
||||||
|
"valid": "job",
|
||||||
|
"definitons": "file"
|
||||||
|
}, meta_path)
|
||||||
|
|
||||||
|
lpc.execute(job_dir)
|
||||||
|
|
||||||
|
output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID])
|
||||||
|
|
||||||
|
self.assertFalse(os.path.exists(job_dir))
|
||||||
|
self.assertTrue(os.path.exists(output_dir))
|
||||||
|
|
||||||
|
error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE)
|
||||||
|
self.assertTrue(os.path.exists(error_file))
|
||||||
|
|
||||||
|
error = read_file(error_file)
|
||||||
|
self.assertEqual(error,
|
||||||
|
"Recieved incorrectly setup job.\n\n\"Job require key "
|
||||||
|
"'job_type'\"")
|
||||||
|
|
||||||
# TODO test job status funcs
|
# TODO test job status funcs
|
||||||
# TODO test mangled status file reads
|
# TODO test mangled status file reads
|
||||||
# TODO test missing input files
|
# TODO test missing input files
|
||||||
|
Reference in New Issue
Block a user