From abffeed9db923539d3e96b573e4a64361da04fcc Mon Sep 17 00:00:00 2001 From: PatchOfScotland Date: Thu, 9 Feb 2023 16:10:10 +0100 Subject: [PATCH] added some more tests for job execution and ensured an error file is always produced but failed jobs --- conductors/local_python_conductor.py | 70 +++++++----- core/correctness/vars.py | 2 +- core/functionality.py | 2 +- recipes/jupyter_notebook_recipe.py | 1 - tests/test_conductors.py | 164 ++++++++++++++++++++++++++- 5 files changed, 203 insertions(+), 36 deletions(-) diff --git a/conductors/local_python_conductor.py b/conductors/local_python_conductor.py index d88dc51..d088f7c 100644 --- a/conductors/local_python_conductor.py +++ b/conductors/local_python_conductor.py @@ -12,11 +12,11 @@ from datetime import datetime from typing import Any, Tuple, Dict from core.correctness.vars import JOB_TYPE_PYTHON, PYTHON_FUNC, JOB_STATUS, \ - STATUS_RUNNING, JOB_START_TIME, JOB_ID, META_FILE, \ + STATUS_RUNNING, JOB_START_TIME, META_FILE, BACKUP_JOB_ERROR_FILE, \ STATUS_DONE, JOB_END_TIME, STATUS_FAILED, JOB_ERROR, \ JOB_TYPE, JOB_TYPE_PAPERMILL, DEFAULT_JOB_QUEUE_DIR, DEFAULT_JOB_OUTPUT_DIR from core.correctness.validation import valid_job, valid_dir_path -from core.functionality import read_yaml, write_yaml, make_dir +from core.functionality import read_yaml, write_yaml, make_dir, write_file from core.meow import BaseConductor @@ -52,41 +52,53 @@ class LocalPythonConductor(BaseConductor): more detailed feedback.""" valid_dir_path(job_dir, must_exist=True) - meta_file = os.path.join(job_dir, META_FILE) - job = read_yaml(meta_file) - valid_job(job) - - # update the status file with running status - job[JOB_STATUS] = STATUS_RUNNING - job[JOB_START_TIME] = datetime.now() - write_yaml(job, meta_file) - - # execute the job + # Test our job parameters. Even if its gibberish, we still move to + # output + abort = False try: - job_function = job[PYTHON_FUNC] - job_function(job_dir) - - # get up to date job data + meta_file = os.path.join(job_dir, META_FILE) job = read_yaml(meta_file) + valid_job(job) - # Update the status file with the finalised status - job[JOB_STATUS] = STATUS_DONE - job[JOB_END_TIME] = datetime.now() + # update the status file with running status + job[JOB_STATUS] = STATUS_RUNNING + job[JOB_START_TIME] = datetime.now() write_yaml(job, meta_file) except Exception as e: - # get up to date job data - job = read_yaml(meta_file) + # If something has gone wrong at this stage then its bad, so we + # need to make our own error file + error_file = os.path.join(job_dir, BACKUP_JOB_ERROR_FILE) + write_file(f"Recieved incorrectly setup job.\n\n{e}", error_file) + abort = True - # Update the status file with the error status. Don't overwrite any - # more specific error messages already created - if JOB_STATUS not in job: - job[JOB_STATUS] = STATUS_FAILED - if JOB_END_TIME not in job: + # execute the job + if not abort: + try: + job_function = job[PYTHON_FUNC] + job_function(job_dir) + + # get up to date job data + job = read_yaml(meta_file) + + # Update the status file with the finalised status + job[JOB_STATUS] = STATUS_DONE job[JOB_END_TIME] = datetime.now() - if JOB_ERROR not in job: - job[JOB_ERROR] = f"Job execution failed. {e}" - write_yaml(job, meta_file) + write_yaml(job, meta_file) + + except Exception as e: + # get up to date job data + job = read_yaml(meta_file) + + # Update the status file with the error status. Don't overwrite + # any more specific error messages already created + if JOB_STATUS not in job: + job[JOB_STATUS] = STATUS_FAILED + if JOB_END_TIME not in job: + job[JOB_END_TIME] = datetime.now() + if JOB_ERROR not in job: + job[JOB_ERROR] = f"Job execution failed. {e}" + write_yaml(job, meta_file) # Move the contents of the execution directory to the final output # directory. diff --git a/core/correctness/vars.py b/core/correctness/vars.py index 2867fa2..d267cee 100644 --- a/core/correctness/vars.py +++ b/core/correctness/vars.py @@ -86,7 +86,7 @@ JOB_TYPE = "job_type" JOB_TYPE_PYTHON = "python" JOB_TYPE_PAPERMILL = "papermill" PYTHON_FUNC = "func" - +BACKUP_JOB_ERROR_FILE = "ERROR.log" JOB_TYPES = { JOB_TYPE_PAPERMILL: [ "base.ipynb", diff --git a/core/functionality.py b/core/functionality.py index b543885..79ac46e 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -25,7 +25,7 @@ from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ EVENT_TYPE, EVENT_PATH, JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, \ JOB_RECIPE, JOB_RULE, EVENT_RULE, JOB_STATUS, STATUS_QUEUED, \ JOB_CREATE_TIME, JOB_REQUIREMENTS, WATCHDOG_BASE, WATCHDOG_HASH, \ - EVENT_TYPE_WATCHDOG, JOB_TYPE_PYTHON + EVENT_TYPE_WATCHDOG # mig trigger keyword replacements KEYWORD_PATH = "{PATH}" diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index 99145d5..99a5d11 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -191,7 +191,6 @@ def papermill_job_func(job_dir): # Identify job files meta_file = os.path.join(job_dir, META_FILE) - # TODO fix these paths so they are dynamic base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL)) job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_PAPERMILL)) result_file = os.path.join(job_dir, get_result_file(JOB_TYPE_PAPERMILL)) diff --git a/tests/test_conductors.py b/tests/test_conductors.py index 744ec7e..8e87581 100644 --- a/tests/test_conductors.py +++ b/tests/test_conductors.py @@ -5,13 +5,13 @@ import unittest from typing import Dict from core.correctness.vars import JOB_TYPE_PYTHON, SHA256, JOB_PARAMETERS, \ - JOB_HASH, PYTHON_FUNC, JOB_ID, \ + JOB_HASH, PYTHON_FUNC, JOB_ID, BACKUP_JOB_ERROR_FILE, \ META_FILE, PARAMS_FILE, JOB_STATUS, JOB_ERROR, \ STATUS_DONE, JOB_TYPE_PAPERMILL, get_base_file, get_result_file, \ get_job_file from core.functionality import get_file_hash, create_watchdog_event, \ create_job, make_dir, write_yaml, write_notebook, read_yaml, write_file, \ - lines_to_string + lines_to_string, read_file from core.meow import create_rule from conductors import LocalPythonConductor from patterns import FileEventPattern @@ -391,8 +391,20 @@ class MeowTests(unittest.TestCase): job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) make_dir(job_dir) - with self.assertRaises(FileNotFoundError): - lpc.execute(job_dir) + lpc.execute(job_dir) + + output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + + self.assertFalse(os.path.exists(job_dir)) + self.assertTrue(os.path.exists(output_dir)) + + error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE) + self.assertTrue(os.path.exists(error_file)) + + error = read_file(error_file) + self.assertEqual(error, + "Recieved incorrectly setup job.\n\n[Errno 2] No such file or " + f"directory: 'test_job_queue_dir/{job_dict[JOB_ID]}/job.yml'") # Test LocalPythonConductor does not execute jobs with bad functions def testLocalPythonConductorBadFunc(self)->None: @@ -466,6 +478,150 @@ class MeowTests(unittest.TestCase): self.assertIsInstance(job, dict) self.assertIn(JOB_ERROR, job) + # Test LocalPythonConductor does not execute jobs with invalid metafile + def testLocalPythonConductorInvalidMetafile(self)->None: + lpc = LocalPythonConductor( + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT + ) + + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") + + with open(file_path, "w") as f: + f.write("Data") + + file_hash = get_file_hash(file_path, SHA256) + + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":result_path + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + job_dict = create_job( + JOB_TYPE_PAPERMILL, + create_watchdog_event( + file_path, + rule, + TEST_MONITOR_BASE, + file_hash + ), + extras={ + JOB_PARAMETERS:{ + "extra":"extra", + "infile":file_path, + "outfile":result_path + }, + JOB_HASH: file_hash, + PYTHON_FUNC:failing_func, + } + ) + + job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) + make_dir(job_dir) + + meta_path = os.path.join(job_dir, META_FILE) + write_file("This is not a metafile dict", meta_path) + + lpc.execute(job_dir) + + output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + + self.assertFalse(os.path.exists(job_dir)) + self.assertTrue(os.path.exists(output_dir)) + + error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE) + self.assertTrue(os.path.exists(error_file)) + + error = read_file(error_file) + self.assertEqual(error, + "Recieved incorrectly setup job.\n\nExpected type(s) are " + "'[typing.Dict]', got ") + + # Test LocalPythonConductor does not execute jobs with mangled metafile + def testLocalPythonConductorMangledMetafile(self)->None: + lpc = LocalPythonConductor( + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT + ) + + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") + + with open(file_path, "w") as f: + f.write("Data") + + file_hash = get_file_hash(file_path, SHA256) + + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":result_path + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + job_dict = create_job( + JOB_TYPE_PAPERMILL, + create_watchdog_event( + file_path, + rule, + TEST_MONITOR_BASE, + file_hash + ), + extras={ + JOB_PARAMETERS:{ + "extra":"extra", + "infile":file_path, + "outfile":result_path + }, + JOB_HASH: file_hash, + PYTHON_FUNC:failing_func, + } + ) + + job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) + make_dir(job_dir) + + meta_path = os.path.join(job_dir, META_FILE) + write_yaml({ + "This": "is", + "a": "dictionary", + "but": "not", + "valid": "job", + "definitons": "file" + }, meta_path) + + lpc.execute(job_dir) + + output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + + self.assertFalse(os.path.exists(job_dir)) + self.assertTrue(os.path.exists(output_dir)) + + error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE) + self.assertTrue(os.path.exists(error_file)) + + error = read_file(error_file) + self.assertEqual(error, + "Recieved incorrectly setup job.\n\n\"Job require key " + "'job_type'\"") + # TODO test job status funcs # TODO test mangled status file reads # TODO test missing input files