diff --git a/conductors/__init__.py b/conductors/__init__.py index 9fbe442..a9d7660 100644 --- a/conductors/__init__.py +++ b/conductors/__init__.py @@ -1,2 +1,3 @@ from .local_python_conductor import LocalPythonConductor +from .local_bash_conductor import LocalBashConductor \ No newline at end of file diff --git a/conductors/local_bash_conductor.py b/conductors/local_bash_conductor.py index 6ebcc89..2a622fe 100644 --- a/conductors/local_bash_conductor.py +++ b/conductors/local_bash_conductor.py @@ -84,15 +84,34 @@ class LocalBashConductor(BaseConductor): # execute the job if not abort: try: - result = subprocess.call(get_job_file(JOB_TYPE_BASH), cwd=".") + print(f"PWD: {os.getcwd()}") + print(f"job_dir: {job_dir}") + print(os.path.exists(os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)))) + result = subprocess.call( + os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)), + cwd="." + ) # get up to date job data job = read_yaml(meta_file) - # Update the status file with the finalised status - job[JOB_STATUS] = STATUS_DONE - job[JOB_END_TIME] = datetime.now() - write_yaml(job, meta_file) + if result == 0: + # Update the status file with the finalised status + job[JOB_STATUS] = STATUS_DONE + job[JOB_END_TIME] = datetime.now() + write_yaml(job, meta_file) + + else: + # Update the status file with the error status. Don't + # overwrite any more specific error messages already + # created + if JOB_STATUS not in job: + job[JOB_STATUS] = STATUS_FAILED + if JOB_END_TIME not in job: + job[JOB_END_TIME] = datetime.now() + if JOB_ERROR not in job: + job[JOB_ERROR] = f"Job execution returned non-zero." + write_yaml(job, meta_file) except Exception as e: # get up to date job data diff --git a/recipes/__init__.py b/recipes/__init__.py index eb28822..c697f55 100644 --- a/recipes/__init__.py +++ b/recipes/__init__.py @@ -1,4 +1,5 @@ from .jupyter_notebook_recipe import JupyterNotebookRecipe, PapermillHandler, \ get_recipe_from_notebook -from .python_recipe import PythonRecipe, PythonHandler \ No newline at end of file +from .python_recipe import PythonRecipe, PythonHandler +from .bash_recipe import BashRecipe, BashHandler \ No newline at end of file diff --git a/recipes/bash_recipe.py b/recipes/bash_recipe.py index 971ad61..f2f149e 100644 --- a/recipes/bash_recipe.py +++ b/recipes/bash_recipe.py @@ -12,9 +12,8 @@ from meow_base.core.correctness.validation import check_type, valid_dict, \ valid_string, valid_dir_path from meow_base.core.correctness.vars import DEBUG_INFO, DEFAULT_JOB_QUEUE_DIR, \ VALID_VARIABLE_NAME_CHARS, EVENT_PATH, EVENT_RULE, EVENT_TYPE, JOB_ID, \ - EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, JOB_PARAMETERS, JOB_HASH, \ - WATCHDOG_HASH, WATCHDOG_BASE, META_FILE, PARAMS_FILE, STATUS_QUEUED, \ - JOB_STATUS, \ + EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, JOB_PARAMETERS, WATCHDOG_HASH, \ + WATCHDOG_BASE, META_FILE, STATUS_QUEUED, JOB_STATUS, \ get_base_file, get_job_file from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.file_io import valid_path, make_dir, write_yaml, \ @@ -138,7 +137,6 @@ class BashHandler(BaseHandler): event, extras={ JOB_PARAMETERS:yaml_dict, - JOB_HASH: event[WATCHDOG_HASH], # CONTROL_SCRIPT:python_job_func } ) diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index fbee432..5e44d03 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -17,7 +17,7 @@ from meow_base.core.correctness.meow import valid_event from meow_base.core.correctness.validation import check_type, valid_string, \ valid_dict, valid_path, valid_dir_path, valid_existing_file_path from meow_base.core.correctness.vars import VALID_VARIABLE_NAME_CHARS, \ - PYTHON_FUNC, DEBUG_INFO, EVENT_TYPE_WATCHDOG, JOB_HASH, \ + PYTHON_FUNC, DEBUG_INFO, EVENT_TYPE_WATCHDOG, \ DEFAULT_JOB_QUEUE_DIR, EVENT_PATH, JOB_TYPE_PAPERMILL, WATCHDOG_HASH, \ JOB_PARAMETERS, JOB_ID, WATCHDOG_BASE, META_FILE, PARAMS_FILE, \ JOB_STATUS, STATUS_QUEUED, EVENT_RULE, EVENT_TYPE, EVENT_RULE, \ @@ -142,7 +142,6 @@ class PapermillHandler(BaseHandler): event, extras={ JOB_PARAMETERS:yaml_dict, - JOB_HASH: event[WATCHDOG_HASH], PYTHON_FUNC:papermill_job_func, } ) @@ -206,7 +205,7 @@ def papermill_job_func(job_dir): from datetime import datetime from meow_base.core.correctness.vars import JOB_EVENT, JOB_ID, \ EVENT_PATH, META_FILE, PARAMS_FILE, \ - JOB_STATUS, JOB_HASH, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ + JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ JOB_ERROR, STATUS_FAILED, get_job_file, \ get_result_file from meow_base.functionality.file_io import read_yaml, write_notebook, write_yaml @@ -228,19 +227,20 @@ def papermill_job_func(job_dir): # Check the hash of the triggering file, if present. This addresses # potential race condition as file could have been modified since # triggering event - if JOB_HASH in job: + if JOB_EVENT in job and WATCHDOG_HASH in job[JOB_EVENT]: # get current hash triggerfile_hash = get_file_hash(job[JOB_EVENT][EVENT_PATH], SHA256) # If hash doesn't match, then abort the job. If its been modified, then # another job will have been scheduled anyway. if not triggerfile_hash \ - or triggerfile_hash != job[JOB_HASH]: + or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]: job[JOB_STATUS] = STATUS_SKIPPED job[JOB_END_TIME] = datetime.now() msg = "Job was skipped as triggering file " + \ f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \ "scheduling. Was expected to have hash " + \ - f"'{job[JOB_HASH]}' but has '{triggerfile_hash}'." + f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \ + f"{triggerfile_hash}'." job[JOB_ERROR] = msg write_yaml(job, meta_file) return diff --git a/recipes/python_recipe.py b/recipes/python_recipe.py index 4db6dd8..d879d2a 100644 --- a/recipes/python_recipe.py +++ b/recipes/python_recipe.py @@ -16,7 +16,7 @@ from meow_base.core.correctness.meow import valid_event from meow_base.core.correctness.validation import check_script, valid_string, \ valid_dict, valid_dir_path from meow_base.core.correctness.vars import VALID_VARIABLE_NAME_CHARS, \ - PYTHON_FUNC, DEBUG_INFO, EVENT_TYPE_WATCHDOG, JOB_HASH, \ + PYTHON_FUNC, DEBUG_INFO, EVENT_TYPE_WATCHDOG, \ DEFAULT_JOB_QUEUE_DIR, EVENT_RULE, EVENT_PATH, JOB_TYPE_PYTHON, \ WATCHDOG_HASH, JOB_PARAMETERS, JOB_ID, WATCHDOG_BASE, META_FILE, \ PARAMS_FILE, JOB_STATUS, STATUS_QUEUED, EVENT_TYPE, EVENT_RULE, \ @@ -132,7 +132,6 @@ class PythonHandler(BaseHandler): event, extras={ JOB_PARAMETERS:yaml_dict, - JOB_HASH: event[WATCHDOG_HASH], PYTHON_FUNC:python_job_func } ) @@ -182,7 +181,7 @@ def python_job_func(job_dir): from io import StringIO from meow_base.core.correctness.vars import JOB_EVENT, JOB_ID, \ EVENT_PATH, META_FILE, PARAMS_FILE, \ - JOB_STATUS, JOB_HASH, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ + JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ JOB_ERROR, STATUS_FAILED, get_base_file, \ get_job_file, get_result_file from meow_base.functionality.file_io import read_yaml, write_yaml @@ -203,19 +202,20 @@ def python_job_func(job_dir): # Check the hash of the triggering file, if present. This addresses # potential race condition as file could have been modified since # triggering event - if JOB_HASH in job: + if JOB_EVENT in job and WATCHDOG_HASH in job[JOB_EVENT]: # get current hash triggerfile_hash = get_file_hash(job[JOB_EVENT][EVENT_PATH], SHA256) # If hash doesn't match, then abort the job. If its been modified, then # another job will have been scheduled anyway. if not triggerfile_hash \ - or triggerfile_hash != job[JOB_HASH]: + or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]: job[JOB_STATUS] = STATUS_SKIPPED job[JOB_END_TIME] = datetime.now() msg = "Job was skipped as triggering file " + \ f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \ "scheduling. Was expected to have hash " + \ - f"'{job[JOB_HASH]}' but has '{triggerfile_hash}'." + f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \ + f"{triggerfile_hash}'." job[JOB_ERROR] = msg write_yaml(job, meta_file) return diff --git a/tests/test_conductors.py b/tests/test_conductors.py index d91233b..106b91e 100644 --- a/tests/test_conductors.py +++ b/tests/test_conductors.py @@ -1,35 +1,40 @@ import os +import stat import unittest from datetime import datetime from typing import Dict from meow_base.core.correctness.vars import JOB_TYPE_PYTHON, SHA256, \ - JOB_PARAMETERS, JOB_HASH, PYTHON_FUNC, JOB_ID, BACKUP_JOB_ERROR_FILE, \ + JOB_PARAMETERS, PYTHON_FUNC, JOB_ID, BACKUP_JOB_ERROR_FILE, \ JOB_EVENT, META_FILE, PARAMS_FILE, JOB_STATUS, JOB_ERROR, JOB_TYPE, \ JOB_PATTERN, STATUS_DONE, JOB_TYPE_PAPERMILL, JOB_RECIPE, JOB_RULE, \ JOB_CREATE_TIME, JOB_REQUIREMENTS, EVENT_PATH, EVENT_RULE, EVENT_TYPE, \ - EVENT_TYPE_WATCHDOG, get_base_file, get_result_file, get_job_file -from meow_base.conductors import LocalPythonConductor + EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, \ + get_base_file, get_result_file, get_job_file +from meow_base.conductors import LocalPythonConductor, LocalBashConductor from meow_base.functionality.file_io import read_file, read_yaml, write_file, \ write_notebook, write_yaml, lines_to_string, make_dir from meow_base.functionality.hashing import get_file_hash from meow_base.functionality.meow import create_watchdog_event, create_job, \ create_rule +from meow_base.functionality.parameterisation import parameterize_bash_script from meow_base.patterns.file_event_pattern import FileEventPattern from meow_base.recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ papermill_job_func from meow_base.recipes.python_recipe import PythonRecipe, python_job_func +from meow_base.recipes.bash_recipe import BashRecipe, assemble_bash_job_script from shared import TEST_MONITOR_BASE, APPENDING_NOTEBOOK, TEST_JOB_OUTPUT, \ TEST_JOB_QUEUE, COMPLETE_PYTHON_SCRIPT, BAREBONES_PYTHON_SCRIPT, \ - BAREBONES_NOTEBOOK, setup, teardown + BAREBONES_NOTEBOOK, COMPLETE_BASH_SCRIPT, BAREBONES_BASH_SCRIPT, \ + setup, teardown def failing_func(): raise Exception("bad function") -class MeowTests(unittest.TestCase): +class PythonTests(unittest.TestCase): def setUp(self)->None: super().setUp() setup() @@ -96,7 +101,6 @@ class MeowTests(unittest.TestCase): ), extras={ JOB_PARAMETERS:params_dict, - JOB_HASH: file_hash, PYTHON_FUNC:python_job_func } ) @@ -184,7 +188,6 @@ class MeowTests(unittest.TestCase): ), extras={ JOB_PARAMETERS:params_dict, - JOB_HASH: file_hash, PYTHON_FUNC:papermill_job_func } ) @@ -271,8 +274,7 @@ class MeowTests(unittest.TestCase): file_hash ), extras={ - JOB_PARAMETERS:params_dict, - JOB_HASH: file_hash, + JOB_PARAMETERS:params_dict } ) @@ -313,7 +315,6 @@ class MeowTests(unittest.TestCase): ), extras={ JOB_PARAMETERS:params_dict, - JOB_HASH: file_hash, PYTHON_FUNC:papermill_job_func } ) @@ -395,7 +396,6 @@ class MeowTests(unittest.TestCase): "infile":file_path, "outfile":result_path }, - JOB_HASH: file_hash, PYTHON_FUNC:failing_func, } ) @@ -463,7 +463,6 @@ class MeowTests(unittest.TestCase): ), extras={ JOB_PARAMETERS:params, - JOB_HASH: file_hash, PYTHON_FUNC:failing_func, } ) @@ -533,7 +532,6 @@ class MeowTests(unittest.TestCase): "infile":file_path, "outfile":result_path }, - JOB_HASH: file_hash, PYTHON_FUNC:failing_func, } ) @@ -602,7 +600,6 @@ class MeowTests(unittest.TestCase): "infile":file_path, "outfile":result_path }, - JOB_HASH: file_hash, PYTHON_FUNC:failing_func, } ) @@ -724,3 +721,476 @@ class MeowTests(unittest.TestCase): self.assertTrue(status) # TODO test job status funcs + +class BashTests(unittest.TestCase): + def setUp(self)->None: + super().setUp() + setup() + + def tearDown(self)->None: + super().tearDown() + teardown() + + # Test LocalBashConductor creation + def testLocalBashConductorCreation(self)->None: + LocalBashConductor() + + # Test LocalBashConductor naming + def testLocalBashConductorNaming(self)->None: + test_name = "test_name" + conductor = LocalBashConductor(name=test_name) + self.assertEqual(conductor.name, test_name) + + conductor = LocalBashConductor() + self.assertTrue(conductor.name.startswith("conductor_")) + + # Test LocalBashConductor executes valid bash jobs + def testLocalBashConductorValidBashJob(self)->None: + lpc = LocalBashConductor( + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT + ) + + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output") + + with open(file_path, "w") as f: + f.write("150") + + file_hash = get_file_hash(file_path, SHA256) + + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "num":450, + "outfile":result_path + }) + recipe = BashRecipe( + "recipe_one", COMPLETE_BASH_SCRIPT) + + rule = create_rule(pattern, recipe) + + params_dict = { + "num":450, + "infile":file_path, + "outfile":result_path + } + + job_dict = create_job( + JOB_TYPE_BASH, + create_watchdog_event( + file_path, + rule, + TEST_MONITOR_BASE, + file_hash + ), + extras={ + JOB_PARAMETERS:params_dict, + } + ) + + job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) + make_dir(job_dir) + + meta_path = os.path.join(job_dir, META_FILE) + write_yaml(job_dict, meta_path) + + base_script = parameterize_bash_script( + COMPLETE_BASH_SCRIPT, params_dict + ) + base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_BASH)) + write_file(lines_to_string(base_script), base_file) + st = os.stat(base_file) + os.chmod(base_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + + job_script = assemble_bash_job_script() + job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)) + write_file(lines_to_string(job_script), job_file) + st = os.stat(job_file) + os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + + lpc.execute(job_dir) + + self.assertFalse(os.path.exists(job_dir)) + + job_output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + self.assertTrue(os.path.exists(job_output_dir)) + + meta_path = os.path.join(job_output_dir, META_FILE) + self.assertTrue(os.path.exists(meta_path)) + status = read_yaml(meta_path) + self.assertIsInstance(status, Dict) + self.assertIn(JOB_STATUS, status) + self.assertEqual(status[JOB_STATUS], STATUS_DONE) + + self.assertNotIn(JOB_ERROR, status) + self.assertTrue(os.path.exists( + os.path.join(job_output_dir, get_base_file(JOB_TYPE_BASH)))) + self.assertTrue(os.path.exists( + os.path.join(job_output_dir, get_job_file(JOB_TYPE_BASH)))) + + self.assertTrue(os.path.exists(result_path)) + + # Test LocalBashConductor does not execute jobs with missing metafile + def testLocalBashConductorMissingMetafile(self)->None: + lpc = LocalBashConductor( + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT + ) + + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") + + with open(file_path, "w") as f: + f.write("150") + + file_hash = get_file_hash(file_path, SHA256) + + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "num":450, + "outfile":result_path + }) + recipe = BashRecipe( + "recipe_one", COMPLETE_BASH_SCRIPT) + + rule = create_rule(pattern, recipe) + + params_dict = { + "num":450, + "infile":file_path, + "outfile":result_path + } + + job_dict = create_job( + JOB_TYPE_BASH, + create_watchdog_event( + file_path, + rule, + TEST_MONITOR_BASE, + file_hash + ), + extras={ + JOB_PARAMETERS: params_dict + } + ) + + job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) + make_dir(job_dir) + + lpc.execute(job_dir) + + output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + + self.assertFalse(os.path.exists(job_dir)) + self.assertTrue(os.path.exists(output_dir)) + + error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE) + self.assertTrue(os.path.exists(error_file)) + + error = read_file(error_file) + self.assertEqual(error, + "Recieved incorrectly setup job.\n\n[Errno 2] No such file or " + f"directory: 'test_job_queue_dir{os.path.sep}{job_dict[JOB_ID]}{os.path.sep}job.yml'") + + # Test LocalBashConductor does not execute jobs with bad script + def testLocalBashConductorBadScript(self)->None: + lpc = LocalBashConductor( + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT + ) + + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") + + with open(file_path, "w") as f: + f.write("150") + + file_hash = get_file_hash(file_path, SHA256) + + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "num":450, + "outfile":result_path + }) + recipe = BashRecipe( + "recipe_one", COMPLETE_BASH_SCRIPT) + + rule = create_rule(pattern, recipe) + + params_dict = { + "num":450, + "infile":file_path, + "outfile":result_path + } + + job_dict = create_job( + JOB_TYPE_PAPERMILL, + create_watchdog_event( + file_path, + rule, + TEST_MONITOR_BASE, + file_hash + ), + extras={ + JOB_PARAMETERS:params_dict, + PYTHON_FUNC:failing_func, + } + ) + + job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) + make_dir(job_dir) + + meta_path = os.path.join(job_dir, META_FILE) + write_yaml(job_dict, meta_path) + + base_script = parameterize_bash_script( + COMPLETE_BASH_SCRIPT, params_dict + ) + base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_BASH)) + write_file(lines_to_string(base_script), base_file) + st = os.stat(base_file) + os.chmod(base_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + + job_script = [ + "#!/bin/bash", + "echo Does Nothing" + ] + job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)) + write_file(lines_to_string(job_script), job_file) + st = os.stat(job_file) + os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + + lpc.execute(job_dir) + + output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + self.assertFalse(os.path.exists(job_dir)) + self.assertTrue(os.path.exists(output_dir)) + + meta_path = os.path.join(output_dir, META_FILE) + self.assertTrue(os.path.exists(meta_path)) + + job = read_yaml(meta_path) + self.assertIsInstance(job, dict) + + # Test LocalBashConductor does not execute jobs with invalid metafile + def testLocalBashConductorInvalidMetafile(self)->None: + lpc = LocalBashConductor( + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT + ) + + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") + + with open(file_path, "w") as f: + f.write("150") + + file_hash = get_file_hash(file_path, SHA256) + + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "num":450, + "outfile":result_path + }) + recipe = BashRecipe( + "recipe_one", COMPLETE_BASH_SCRIPT) + + rule = create_rule(pattern, recipe) + + params_dict = { + "num":450, + "infile":file_path, + "outfile":result_path + } + + job_dict = create_job( + JOB_TYPE_PAPERMILL, + create_watchdog_event( + file_path, + rule, + TEST_MONITOR_BASE, + file_hash + ), + extras={ + JOB_PARAMETERS: params_dict + } + ) + + job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) + make_dir(job_dir) + + meta_path = os.path.join(job_dir, META_FILE) + write_file("This is not a metafile dict", meta_path) + + lpc.execute(job_dir) + + output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + + self.assertFalse(os.path.exists(job_dir)) + self.assertTrue(os.path.exists(output_dir)) + + error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE) + self.assertTrue(os.path.exists(error_file)) + + error = read_file(error_file) + self.assertEqual(error, + "Recieved incorrectly setup job.\n\nExpected type(s) are " + "'[typing.Dict]', got ") + + # Test LocalBashConductor does not execute jobs with mangled metafile + def testLocalBashConductorMangledMetafile(self)->None: + lpc = LocalBashConductor( + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT + ) + + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") + + with open(file_path, "w") as f: + f.write("150") + + file_hash = get_file_hash(file_path, SHA256) + + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "num":450, + "outfile":result_path + }) + recipe = BashRecipe( + "recipe_one", COMPLETE_BASH_SCRIPT) + + rule = create_rule(pattern, recipe) + + params_dict = { + "num":450, + "infile":file_path, + "outfile":result_path + } + + job_dict = create_job( + JOB_TYPE_PAPERMILL, + create_watchdog_event( + file_path, + rule, + TEST_MONITOR_BASE, + file_hash + ), + extras={ + JOB_PARAMETERS: params_dict + } + ) + + job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) + make_dir(job_dir) + + meta_path = os.path.join(job_dir, META_FILE) + write_yaml({ + "This": "is", + "a": "dictionary", + "but": "not", + "valid": "job", + "definitons": "file" + }, meta_path) + + lpc.execute(job_dir) + + output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + + self.assertFalse(os.path.exists(job_dir)) + self.assertTrue(os.path.exists(output_dir)) + + error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE) + self.assertTrue(os.path.exists(error_file)) + + error = read_file(error_file) + self.assertEqual(error, + "Recieved incorrectly setup job.\n\n\"Job require key " + "'job_type'\"") + + # Test execute criteria function + def testValidExecuteCriteria(self)->None: + lpc = LocalBashConductor() + + pattern_bash = FileEventPattern( + "pattern_bash", "A", "recipe_bash", "file_one") + recipe_bash = BashRecipe( + "recipe_bash", BAREBONES_BASH_SCRIPT + ) + + bash_rule = create_rule(pattern_bash, recipe_bash) + + status, _ = lpc.valid_execute_criteria({}) + self.assertFalse(status) + + status, _ = lpc.valid_execute_criteria("") + self.assertFalse(status) + + status, _ = lpc.valid_execute_criteria({ + JOB_ID: "path", + JOB_EVENT: "type", + JOB_TYPE: "rule", + JOB_PATTERN: "pattern", + JOB_RECIPE: "recipe", + JOB_RULE: "rule", + JOB_STATUS: "status", + JOB_CREATE_TIME: "create", + JOB_REQUIREMENTS: "requirements" + }) + self.assertFalse(status) + + status, s = lpc.valid_execute_criteria({ + JOB_ID: "path", + JOB_EVENT: { + EVENT_PATH: "path", + EVENT_TYPE: EVENT_TYPE_WATCHDOG, + EVENT_RULE: bash_rule + }, + JOB_TYPE: "type", + JOB_PATTERN: bash_rule.pattern.name, + JOB_RECIPE: bash_rule.recipe.name, + JOB_RULE: bash_rule.name, + JOB_STATUS: "status", + JOB_CREATE_TIME: datetime.now(), + JOB_REQUIREMENTS: bash_rule.recipe.requirements + }) + self.assertFalse(status) + + status, s = lpc.valid_execute_criteria({ + JOB_ID: "path", + JOB_EVENT: { + EVENT_PATH: "path", + EVENT_TYPE: EVENT_TYPE_WATCHDOG, + EVENT_RULE: bash_rule + }, + JOB_TYPE: JOB_TYPE_BASH, + JOB_PATTERN: bash_rule.pattern.name, + JOB_RECIPE: bash_rule.recipe.name, + JOB_RULE: bash_rule.name, + JOB_STATUS: "status", + JOB_CREATE_TIME: datetime.now(), + JOB_REQUIREMENTS: bash_rule.recipe.requirements + }) + self.assertTrue(status) + + # TODO test job status funcs diff --git a/tests/test_functionality.py b/tests/test_functionality.py index d6c8fc9..c312153 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -14,7 +14,7 @@ from typing import Dict from meow_base.core.rule import Rule from meow_base.core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ SHA256, EVENT_TYPE, EVENT_PATH, EVENT_TYPE_WATCHDOG, \ - WATCHDOG_BASE, WATCHDOG_HASH, EVENT_RULE, JOB_PARAMETERS, JOB_HASH, \ + WATCHDOG_BASE, WATCHDOG_HASH, EVENT_RULE, JOB_PARAMETERS, \ PYTHON_FUNC, JOB_ID, JOB_EVENT, \ JOB_TYPE, JOB_PATTERN, JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, \ JOB_REQUIREMENTS, STATUS_QUEUED, JOB_TYPE_PAPERMILL @@ -443,7 +443,6 @@ class MeowTests(unittest.TestCase): "infile":"file_path", "outfile":"result_path" }, - JOB_HASH: "file_hash", PYTHON_FUNC:max } ) diff --git a/tests/test_recipes.py b/tests/test_recipes.py index 0f43c39..8450fd6 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -11,7 +11,7 @@ from typing import Dict from meow_base.core.correctness.meow import valid_job from meow_base.core.correctness.vars import EVENT_TYPE, WATCHDOG_BASE, \ EVENT_RULE, EVENT_TYPE_WATCHDOG, EVENT_PATH, SHA256, WATCHDOG_HASH, \ - JOB_ID, JOB_TYPE_PYTHON, JOB_PARAMETERS, JOB_HASH, PYTHON_FUNC, \ + JOB_ID, JOB_TYPE_PYTHON, JOB_PARAMETERS, PYTHON_FUNC, \ JOB_STATUS, META_FILE, JOB_ERROR, PARAMS_FILE, SWEEP_STOP, SWEEP_JUMP, \ SWEEP_START, JOB_TYPE_PAPERMILL, JOB_TYPE_BASH, \ get_base_file, get_job_file, get_result_file @@ -378,7 +378,6 @@ class PapermillHandlerTests(unittest.TestCase): ), extras={ JOB_PARAMETERS:params_dict, - JOB_HASH: file_hash, PYTHON_FUNC:papermill_job_func } ) @@ -797,7 +796,6 @@ class PythonHandlerTests(unittest.TestCase): ), extras={ JOB_PARAMETERS:params_dict, - JOB_HASH: file_hash, PYTHON_FUNC:python_job_func } ) @@ -1210,8 +1208,7 @@ class BashHandlerTests(unittest.TestCase): file_hash ), extras={ - JOB_PARAMETERS:params_dict, - JOB_HASH: file_hash + JOB_PARAMETERS:params_dict } ) @@ -1235,7 +1232,6 @@ class BashHandlerTests(unittest.TestCase): st = os.stat(job_file) os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) - print(os.listdir(job_dir)) print(os.getcwd()) @@ -1265,16 +1261,6 @@ class BashHandlerTests(unittest.TestCase): self.assertEqual(result, "124937\n") - # Test jobFunc doesn't execute with no args - def testJobFuncBadArgs(self)->None: - try: - Bash_job_func({}) - except Exception: - pass - - self.assertEqual(len(os.listdir(TEST_JOB_QUEUE)), 0) - self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) - # Test handling criteria function def testValidHandleCriteria(self)->None: ph = BashHandler()