added tests for new bash jobs, and removed extra hash definition in job dict
This commit is contained in:
@ -1,2 +1,3 @@
|
||||
|
||||
from .local_python_conductor import LocalPythonConductor
|
||||
from .local_bash_conductor import LocalBashConductor
|
@ -84,16 +84,35 @@ class LocalBashConductor(BaseConductor):
|
||||
# execute the job
|
||||
if not abort:
|
||||
try:
|
||||
result = subprocess.call(get_job_file(JOB_TYPE_BASH), cwd=".")
|
||||
print(f"PWD: {os.getcwd()}")
|
||||
print(f"job_dir: {job_dir}")
|
||||
print(os.path.exists(os.path.join(job_dir, get_job_file(JOB_TYPE_BASH))))
|
||||
result = subprocess.call(
|
||||
os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)),
|
||||
cwd="."
|
||||
)
|
||||
|
||||
# get up to date job data
|
||||
job = read_yaml(meta_file)
|
||||
|
||||
if result == 0:
|
||||
# Update the status file with the finalised status
|
||||
job[JOB_STATUS] = STATUS_DONE
|
||||
job[JOB_END_TIME] = datetime.now()
|
||||
write_yaml(job, meta_file)
|
||||
|
||||
else:
|
||||
# Update the status file with the error status. Don't
|
||||
# overwrite any more specific error messages already
|
||||
# created
|
||||
if JOB_STATUS not in job:
|
||||
job[JOB_STATUS] = STATUS_FAILED
|
||||
if JOB_END_TIME not in job:
|
||||
job[JOB_END_TIME] = datetime.now()
|
||||
if JOB_ERROR not in job:
|
||||
job[JOB_ERROR] = f"Job execution returned non-zero."
|
||||
write_yaml(job, meta_file)
|
||||
|
||||
except Exception as e:
|
||||
# get up to date job data
|
||||
job = read_yaml(meta_file)
|
||||
|
@ -2,3 +2,4 @@
|
||||
from .jupyter_notebook_recipe import JupyterNotebookRecipe, PapermillHandler, \
|
||||
get_recipe_from_notebook
|
||||
from .python_recipe import PythonRecipe, PythonHandler
|
||||
from .bash_recipe import BashRecipe, BashHandler
|
@ -12,9 +12,8 @@ from meow_base.core.correctness.validation import check_type, valid_dict, \
|
||||
valid_string, valid_dir_path
|
||||
from meow_base.core.correctness.vars import DEBUG_INFO, DEFAULT_JOB_QUEUE_DIR, \
|
||||
VALID_VARIABLE_NAME_CHARS, EVENT_PATH, EVENT_RULE, EVENT_TYPE, JOB_ID, \
|
||||
EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, JOB_PARAMETERS, JOB_HASH, \
|
||||
WATCHDOG_HASH, WATCHDOG_BASE, META_FILE, PARAMS_FILE, STATUS_QUEUED, \
|
||||
JOB_STATUS, \
|
||||
EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, JOB_PARAMETERS, WATCHDOG_HASH, \
|
||||
WATCHDOG_BASE, META_FILE, STATUS_QUEUED, JOB_STATUS, \
|
||||
get_base_file, get_job_file
|
||||
from meow_base.functionality.debug import setup_debugging, print_debug
|
||||
from meow_base.functionality.file_io import valid_path, make_dir, write_yaml, \
|
||||
@ -138,7 +137,6 @@ class BashHandler(BaseHandler):
|
||||
event,
|
||||
extras={
|
||||
JOB_PARAMETERS:yaml_dict,
|
||||
JOB_HASH: event[WATCHDOG_HASH],
|
||||
# CONTROL_SCRIPT:python_job_func
|
||||
}
|
||||
)
|
||||
|
@ -17,7 +17,7 @@ from meow_base.core.correctness.meow import valid_event
|
||||
from meow_base.core.correctness.validation import check_type, valid_string, \
|
||||
valid_dict, valid_path, valid_dir_path, valid_existing_file_path
|
||||
from meow_base.core.correctness.vars import VALID_VARIABLE_NAME_CHARS, \
|
||||
PYTHON_FUNC, DEBUG_INFO, EVENT_TYPE_WATCHDOG, JOB_HASH, \
|
||||
PYTHON_FUNC, DEBUG_INFO, EVENT_TYPE_WATCHDOG, \
|
||||
DEFAULT_JOB_QUEUE_DIR, EVENT_PATH, JOB_TYPE_PAPERMILL, WATCHDOG_HASH, \
|
||||
JOB_PARAMETERS, JOB_ID, WATCHDOG_BASE, META_FILE, PARAMS_FILE, \
|
||||
JOB_STATUS, STATUS_QUEUED, EVENT_RULE, EVENT_TYPE, EVENT_RULE, \
|
||||
@ -142,7 +142,6 @@ class PapermillHandler(BaseHandler):
|
||||
event,
|
||||
extras={
|
||||
JOB_PARAMETERS:yaml_dict,
|
||||
JOB_HASH: event[WATCHDOG_HASH],
|
||||
PYTHON_FUNC:papermill_job_func,
|
||||
}
|
||||
)
|
||||
@ -206,7 +205,7 @@ def papermill_job_func(job_dir):
|
||||
from datetime import datetime
|
||||
from meow_base.core.correctness.vars import JOB_EVENT, JOB_ID, \
|
||||
EVENT_PATH, META_FILE, PARAMS_FILE, \
|
||||
JOB_STATUS, JOB_HASH, SHA256, STATUS_SKIPPED, JOB_END_TIME, \
|
||||
JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \
|
||||
JOB_ERROR, STATUS_FAILED, get_job_file, \
|
||||
get_result_file
|
||||
from meow_base.functionality.file_io import read_yaml, write_notebook, write_yaml
|
||||
@ -228,19 +227,20 @@ def papermill_job_func(job_dir):
|
||||
# Check the hash of the triggering file, if present. This addresses
|
||||
# potential race condition as file could have been modified since
|
||||
# triggering event
|
||||
if JOB_HASH in job:
|
||||
if JOB_EVENT in job and WATCHDOG_HASH in job[JOB_EVENT]:
|
||||
# get current hash
|
||||
triggerfile_hash = get_file_hash(job[JOB_EVENT][EVENT_PATH], SHA256)
|
||||
# If hash doesn't match, then abort the job. If its been modified, then
|
||||
# another job will have been scheduled anyway.
|
||||
if not triggerfile_hash \
|
||||
or triggerfile_hash != job[JOB_HASH]:
|
||||
or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]:
|
||||
job[JOB_STATUS] = STATUS_SKIPPED
|
||||
job[JOB_END_TIME] = datetime.now()
|
||||
msg = "Job was skipped as triggering file " + \
|
||||
f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \
|
||||
"scheduling. Was expected to have hash " + \
|
||||
f"'{job[JOB_HASH]}' but has '{triggerfile_hash}'."
|
||||
f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \
|
||||
f"{triggerfile_hash}'."
|
||||
job[JOB_ERROR] = msg
|
||||
write_yaml(job, meta_file)
|
||||
return
|
||||
|
@ -16,7 +16,7 @@ from meow_base.core.correctness.meow import valid_event
|
||||
from meow_base.core.correctness.validation import check_script, valid_string, \
|
||||
valid_dict, valid_dir_path
|
||||
from meow_base.core.correctness.vars import VALID_VARIABLE_NAME_CHARS, \
|
||||
PYTHON_FUNC, DEBUG_INFO, EVENT_TYPE_WATCHDOG, JOB_HASH, \
|
||||
PYTHON_FUNC, DEBUG_INFO, EVENT_TYPE_WATCHDOG, \
|
||||
DEFAULT_JOB_QUEUE_DIR, EVENT_RULE, EVENT_PATH, JOB_TYPE_PYTHON, \
|
||||
WATCHDOG_HASH, JOB_PARAMETERS, JOB_ID, WATCHDOG_BASE, META_FILE, \
|
||||
PARAMS_FILE, JOB_STATUS, STATUS_QUEUED, EVENT_TYPE, EVENT_RULE, \
|
||||
@ -132,7 +132,6 @@ class PythonHandler(BaseHandler):
|
||||
event,
|
||||
extras={
|
||||
JOB_PARAMETERS:yaml_dict,
|
||||
JOB_HASH: event[WATCHDOG_HASH],
|
||||
PYTHON_FUNC:python_job_func
|
||||
}
|
||||
)
|
||||
@ -182,7 +181,7 @@ def python_job_func(job_dir):
|
||||
from io import StringIO
|
||||
from meow_base.core.correctness.vars import JOB_EVENT, JOB_ID, \
|
||||
EVENT_PATH, META_FILE, PARAMS_FILE, \
|
||||
JOB_STATUS, JOB_HASH, SHA256, STATUS_SKIPPED, JOB_END_TIME, \
|
||||
JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \
|
||||
JOB_ERROR, STATUS_FAILED, get_base_file, \
|
||||
get_job_file, get_result_file
|
||||
from meow_base.functionality.file_io import read_yaml, write_yaml
|
||||
@ -203,19 +202,20 @@ def python_job_func(job_dir):
|
||||
# Check the hash of the triggering file, if present. This addresses
|
||||
# potential race condition as file could have been modified since
|
||||
# triggering event
|
||||
if JOB_HASH in job:
|
||||
if JOB_EVENT in job and WATCHDOG_HASH in job[JOB_EVENT]:
|
||||
# get current hash
|
||||
triggerfile_hash = get_file_hash(job[JOB_EVENT][EVENT_PATH], SHA256)
|
||||
# If hash doesn't match, then abort the job. If its been modified, then
|
||||
# another job will have been scheduled anyway.
|
||||
if not triggerfile_hash \
|
||||
or triggerfile_hash != job[JOB_HASH]:
|
||||
or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]:
|
||||
job[JOB_STATUS] = STATUS_SKIPPED
|
||||
job[JOB_END_TIME] = datetime.now()
|
||||
msg = "Job was skipped as triggering file " + \
|
||||
f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \
|
||||
"scheduling. Was expected to have hash " + \
|
||||
f"'{job[JOB_HASH]}' but has '{triggerfile_hash}'."
|
||||
f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \
|
||||
f"{triggerfile_hash}'."
|
||||
job[JOB_ERROR] = msg
|
||||
write_yaml(job, meta_file)
|
||||
return
|
||||
|
@ -1,35 +1,40 @@
|
||||
|
||||
import os
|
||||
import stat
|
||||
import unittest
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Dict
|
||||
|
||||
from meow_base.core.correctness.vars import JOB_TYPE_PYTHON, SHA256, \
|
||||
JOB_PARAMETERS, JOB_HASH, PYTHON_FUNC, JOB_ID, BACKUP_JOB_ERROR_FILE, \
|
||||
JOB_PARAMETERS, PYTHON_FUNC, JOB_ID, BACKUP_JOB_ERROR_FILE, \
|
||||
JOB_EVENT, META_FILE, PARAMS_FILE, JOB_STATUS, JOB_ERROR, JOB_TYPE, \
|
||||
JOB_PATTERN, STATUS_DONE, JOB_TYPE_PAPERMILL, JOB_RECIPE, JOB_RULE, \
|
||||
JOB_CREATE_TIME, JOB_REQUIREMENTS, EVENT_PATH, EVENT_RULE, EVENT_TYPE, \
|
||||
EVENT_TYPE_WATCHDOG, get_base_file, get_result_file, get_job_file
|
||||
from meow_base.conductors import LocalPythonConductor
|
||||
EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, \
|
||||
get_base_file, get_result_file, get_job_file
|
||||
from meow_base.conductors import LocalPythonConductor, LocalBashConductor
|
||||
from meow_base.functionality.file_io import read_file, read_yaml, write_file, \
|
||||
write_notebook, write_yaml, lines_to_string, make_dir
|
||||
from meow_base.functionality.hashing import get_file_hash
|
||||
from meow_base.functionality.meow import create_watchdog_event, create_job, \
|
||||
create_rule
|
||||
from meow_base.functionality.parameterisation import parameterize_bash_script
|
||||
from meow_base.patterns.file_event_pattern import FileEventPattern
|
||||
from meow_base.recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \
|
||||
papermill_job_func
|
||||
from meow_base.recipes.python_recipe import PythonRecipe, python_job_func
|
||||
from meow_base.recipes.bash_recipe import BashRecipe, assemble_bash_job_script
|
||||
from shared import TEST_MONITOR_BASE, APPENDING_NOTEBOOK, TEST_JOB_OUTPUT, \
|
||||
TEST_JOB_QUEUE, COMPLETE_PYTHON_SCRIPT, BAREBONES_PYTHON_SCRIPT, \
|
||||
BAREBONES_NOTEBOOK, setup, teardown
|
||||
BAREBONES_NOTEBOOK, COMPLETE_BASH_SCRIPT, BAREBONES_BASH_SCRIPT, \
|
||||
setup, teardown
|
||||
|
||||
def failing_func():
|
||||
raise Exception("bad function")
|
||||
|
||||
|
||||
class MeowTests(unittest.TestCase):
|
||||
class PythonTests(unittest.TestCase):
|
||||
def setUp(self)->None:
|
||||
super().setUp()
|
||||
setup()
|
||||
@ -96,7 +101,6 @@ class MeowTests(unittest.TestCase):
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS:params_dict,
|
||||
JOB_HASH: file_hash,
|
||||
PYTHON_FUNC:python_job_func
|
||||
}
|
||||
)
|
||||
@ -184,7 +188,6 @@ class MeowTests(unittest.TestCase):
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS:params_dict,
|
||||
JOB_HASH: file_hash,
|
||||
PYTHON_FUNC:papermill_job_func
|
||||
}
|
||||
)
|
||||
@ -271,8 +274,7 @@ class MeowTests(unittest.TestCase):
|
||||
file_hash
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS:params_dict,
|
||||
JOB_HASH: file_hash,
|
||||
JOB_PARAMETERS:params_dict
|
||||
}
|
||||
)
|
||||
|
||||
@ -313,7 +315,6 @@ class MeowTests(unittest.TestCase):
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS:params_dict,
|
||||
JOB_HASH: file_hash,
|
||||
PYTHON_FUNC:papermill_job_func
|
||||
}
|
||||
)
|
||||
@ -395,7 +396,6 @@ class MeowTests(unittest.TestCase):
|
||||
"infile":file_path,
|
||||
"outfile":result_path
|
||||
},
|
||||
JOB_HASH: file_hash,
|
||||
PYTHON_FUNC:failing_func,
|
||||
}
|
||||
)
|
||||
@ -463,7 +463,6 @@ class MeowTests(unittest.TestCase):
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS:params,
|
||||
JOB_HASH: file_hash,
|
||||
PYTHON_FUNC:failing_func,
|
||||
}
|
||||
)
|
||||
@ -533,7 +532,6 @@ class MeowTests(unittest.TestCase):
|
||||
"infile":file_path,
|
||||
"outfile":result_path
|
||||
},
|
||||
JOB_HASH: file_hash,
|
||||
PYTHON_FUNC:failing_func,
|
||||
}
|
||||
)
|
||||
@ -602,7 +600,6 @@ class MeowTests(unittest.TestCase):
|
||||
"infile":file_path,
|
||||
"outfile":result_path
|
||||
},
|
||||
JOB_HASH: file_hash,
|
||||
PYTHON_FUNC:failing_func,
|
||||
}
|
||||
)
|
||||
@ -724,3 +721,476 @@ class MeowTests(unittest.TestCase):
|
||||
self.assertTrue(status)
|
||||
|
||||
# TODO test job status funcs
|
||||
|
||||
class BashTests(unittest.TestCase):
|
||||
def setUp(self)->None:
|
||||
super().setUp()
|
||||
setup()
|
||||
|
||||
def tearDown(self)->None:
|
||||
super().tearDown()
|
||||
teardown()
|
||||
|
||||
# Test LocalBashConductor creation
|
||||
def testLocalBashConductorCreation(self)->None:
|
||||
LocalBashConductor()
|
||||
|
||||
# Test LocalBashConductor naming
|
||||
def testLocalBashConductorNaming(self)->None:
|
||||
test_name = "test_name"
|
||||
conductor = LocalBashConductor(name=test_name)
|
||||
self.assertEqual(conductor.name, test_name)
|
||||
|
||||
conductor = LocalBashConductor()
|
||||
self.assertTrue(conductor.name.startswith("conductor_"))
|
||||
|
||||
# Test LocalBashConductor executes valid bash jobs
|
||||
def testLocalBashConductorValidBashJob(self)->None:
|
||||
lpc = LocalBashConductor(
|
||||
job_queue_dir=TEST_JOB_QUEUE,
|
||||
job_output_dir=TEST_JOB_OUTPUT
|
||||
)
|
||||
|
||||
file_path = os.path.join(TEST_MONITOR_BASE, "test")
|
||||
result_path = os.path.join(TEST_MONITOR_BASE, "output")
|
||||
|
||||
with open(file_path, "w") as f:
|
||||
f.write("150")
|
||||
|
||||
file_hash = get_file_hash(file_path, SHA256)
|
||||
|
||||
pattern = FileEventPattern(
|
||||
"pattern",
|
||||
file_path,
|
||||
"recipe_one",
|
||||
"infile",
|
||||
parameters={
|
||||
"num":450,
|
||||
"outfile":result_path
|
||||
})
|
||||
recipe = BashRecipe(
|
||||
"recipe_one", COMPLETE_BASH_SCRIPT)
|
||||
|
||||
rule = create_rule(pattern, recipe)
|
||||
|
||||
params_dict = {
|
||||
"num":450,
|
||||
"infile":file_path,
|
||||
"outfile":result_path
|
||||
}
|
||||
|
||||
job_dict = create_job(
|
||||
JOB_TYPE_BASH,
|
||||
create_watchdog_event(
|
||||
file_path,
|
||||
rule,
|
||||
TEST_MONITOR_BASE,
|
||||
file_hash
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS:params_dict,
|
||||
}
|
||||
)
|
||||
|
||||
job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID])
|
||||
make_dir(job_dir)
|
||||
|
||||
meta_path = os.path.join(job_dir, META_FILE)
|
||||
write_yaml(job_dict, meta_path)
|
||||
|
||||
base_script = parameterize_bash_script(
|
||||
COMPLETE_BASH_SCRIPT, params_dict
|
||||
)
|
||||
base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_BASH))
|
||||
write_file(lines_to_string(base_script), base_file)
|
||||
st = os.stat(base_file)
|
||||
os.chmod(base_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
||||
|
||||
job_script = assemble_bash_job_script()
|
||||
job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_BASH))
|
||||
write_file(lines_to_string(job_script), job_file)
|
||||
st = os.stat(job_file)
|
||||
os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
||||
|
||||
lpc.execute(job_dir)
|
||||
|
||||
self.assertFalse(os.path.exists(job_dir))
|
||||
|
||||
job_output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID])
|
||||
self.assertTrue(os.path.exists(job_output_dir))
|
||||
|
||||
meta_path = os.path.join(job_output_dir, META_FILE)
|
||||
self.assertTrue(os.path.exists(meta_path))
|
||||
status = read_yaml(meta_path)
|
||||
self.assertIsInstance(status, Dict)
|
||||
self.assertIn(JOB_STATUS, status)
|
||||
self.assertEqual(status[JOB_STATUS], STATUS_DONE)
|
||||
|
||||
self.assertNotIn(JOB_ERROR, status)
|
||||
self.assertTrue(os.path.exists(
|
||||
os.path.join(job_output_dir, get_base_file(JOB_TYPE_BASH))))
|
||||
self.assertTrue(os.path.exists(
|
||||
os.path.join(job_output_dir, get_job_file(JOB_TYPE_BASH))))
|
||||
|
||||
self.assertTrue(os.path.exists(result_path))
|
||||
|
||||
# Test LocalBashConductor does not execute jobs with missing metafile
|
||||
def testLocalBashConductorMissingMetafile(self)->None:
|
||||
lpc = LocalBashConductor(
|
||||
job_queue_dir=TEST_JOB_QUEUE,
|
||||
job_output_dir=TEST_JOB_OUTPUT
|
||||
)
|
||||
|
||||
file_path = os.path.join(TEST_MONITOR_BASE, "test")
|
||||
result_path = os.path.join(TEST_MONITOR_BASE, "output", "test")
|
||||
|
||||
with open(file_path, "w") as f:
|
||||
f.write("150")
|
||||
|
||||
file_hash = get_file_hash(file_path, SHA256)
|
||||
|
||||
pattern = FileEventPattern(
|
||||
"pattern",
|
||||
file_path,
|
||||
"recipe_one",
|
||||
"infile",
|
||||
parameters={
|
||||
"num":450,
|
||||
"outfile":result_path
|
||||
})
|
||||
recipe = BashRecipe(
|
||||
"recipe_one", COMPLETE_BASH_SCRIPT)
|
||||
|
||||
rule = create_rule(pattern, recipe)
|
||||
|
||||
params_dict = {
|
||||
"num":450,
|
||||
"infile":file_path,
|
||||
"outfile":result_path
|
||||
}
|
||||
|
||||
job_dict = create_job(
|
||||
JOB_TYPE_BASH,
|
||||
create_watchdog_event(
|
||||
file_path,
|
||||
rule,
|
||||
TEST_MONITOR_BASE,
|
||||
file_hash
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS: params_dict
|
||||
}
|
||||
)
|
||||
|
||||
job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID])
|
||||
make_dir(job_dir)
|
||||
|
||||
lpc.execute(job_dir)
|
||||
|
||||
output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID])
|
||||
|
||||
self.assertFalse(os.path.exists(job_dir))
|
||||
self.assertTrue(os.path.exists(output_dir))
|
||||
|
||||
error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE)
|
||||
self.assertTrue(os.path.exists(error_file))
|
||||
|
||||
error = read_file(error_file)
|
||||
self.assertEqual(error,
|
||||
"Recieved incorrectly setup job.\n\n[Errno 2] No such file or "
|
||||
f"directory: 'test_job_queue_dir{os.path.sep}{job_dict[JOB_ID]}{os.path.sep}job.yml'")
|
||||
|
||||
# Test LocalBashConductor does not execute jobs with bad script
|
||||
def testLocalBashConductorBadScript(self)->None:
|
||||
lpc = LocalBashConductor(
|
||||
job_queue_dir=TEST_JOB_QUEUE,
|
||||
job_output_dir=TEST_JOB_OUTPUT
|
||||
)
|
||||
|
||||
file_path = os.path.join(TEST_MONITOR_BASE, "test")
|
||||
result_path = os.path.join(TEST_MONITOR_BASE, "output", "test")
|
||||
|
||||
with open(file_path, "w") as f:
|
||||
f.write("150")
|
||||
|
||||
file_hash = get_file_hash(file_path, SHA256)
|
||||
|
||||
pattern = FileEventPattern(
|
||||
"pattern",
|
||||
file_path,
|
||||
"recipe_one",
|
||||
"infile",
|
||||
parameters={
|
||||
"num":450,
|
||||
"outfile":result_path
|
||||
})
|
||||
recipe = BashRecipe(
|
||||
"recipe_one", COMPLETE_BASH_SCRIPT)
|
||||
|
||||
rule = create_rule(pattern, recipe)
|
||||
|
||||
params_dict = {
|
||||
"num":450,
|
||||
"infile":file_path,
|
||||
"outfile":result_path
|
||||
}
|
||||
|
||||
job_dict = create_job(
|
||||
JOB_TYPE_PAPERMILL,
|
||||
create_watchdog_event(
|
||||
file_path,
|
||||
rule,
|
||||
TEST_MONITOR_BASE,
|
||||
file_hash
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS:params_dict,
|
||||
PYTHON_FUNC:failing_func,
|
||||
}
|
||||
)
|
||||
|
||||
job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID])
|
||||
make_dir(job_dir)
|
||||
|
||||
meta_path = os.path.join(job_dir, META_FILE)
|
||||
write_yaml(job_dict, meta_path)
|
||||
|
||||
base_script = parameterize_bash_script(
|
||||
COMPLETE_BASH_SCRIPT, params_dict
|
||||
)
|
||||
base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_BASH))
|
||||
write_file(lines_to_string(base_script), base_file)
|
||||
st = os.stat(base_file)
|
||||
os.chmod(base_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
||||
|
||||
job_script = [
|
||||
"#!/bin/bash",
|
||||
"echo Does Nothing"
|
||||
]
|
||||
job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_BASH))
|
||||
write_file(lines_to_string(job_script), job_file)
|
||||
st = os.stat(job_file)
|
||||
os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
||||
|
||||
lpc.execute(job_dir)
|
||||
|
||||
output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID])
|
||||
self.assertFalse(os.path.exists(job_dir))
|
||||
self.assertTrue(os.path.exists(output_dir))
|
||||
|
||||
meta_path = os.path.join(output_dir, META_FILE)
|
||||
self.assertTrue(os.path.exists(meta_path))
|
||||
|
||||
job = read_yaml(meta_path)
|
||||
self.assertIsInstance(job, dict)
|
||||
|
||||
# Test LocalBashConductor does not execute jobs with invalid metafile
|
||||
def testLocalBashConductorInvalidMetafile(self)->None:
|
||||
lpc = LocalBashConductor(
|
||||
job_queue_dir=TEST_JOB_QUEUE,
|
||||
job_output_dir=TEST_JOB_OUTPUT
|
||||
)
|
||||
|
||||
file_path = os.path.join(TEST_MONITOR_BASE, "test")
|
||||
result_path = os.path.join(TEST_MONITOR_BASE, "output", "test")
|
||||
|
||||
with open(file_path, "w") as f:
|
||||
f.write("150")
|
||||
|
||||
file_hash = get_file_hash(file_path, SHA256)
|
||||
|
||||
pattern = FileEventPattern(
|
||||
"pattern",
|
||||
file_path,
|
||||
"recipe_one",
|
||||
"infile",
|
||||
parameters={
|
||||
"num":450,
|
||||
"outfile":result_path
|
||||
})
|
||||
recipe = BashRecipe(
|
||||
"recipe_one", COMPLETE_BASH_SCRIPT)
|
||||
|
||||
rule = create_rule(pattern, recipe)
|
||||
|
||||
params_dict = {
|
||||
"num":450,
|
||||
"infile":file_path,
|
||||
"outfile":result_path
|
||||
}
|
||||
|
||||
job_dict = create_job(
|
||||
JOB_TYPE_PAPERMILL,
|
||||
create_watchdog_event(
|
||||
file_path,
|
||||
rule,
|
||||
TEST_MONITOR_BASE,
|
||||
file_hash
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS: params_dict
|
||||
}
|
||||
)
|
||||
|
||||
job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID])
|
||||
make_dir(job_dir)
|
||||
|
||||
meta_path = os.path.join(job_dir, META_FILE)
|
||||
write_file("This is not a metafile dict", meta_path)
|
||||
|
||||
lpc.execute(job_dir)
|
||||
|
||||
output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID])
|
||||
|
||||
self.assertFalse(os.path.exists(job_dir))
|
||||
self.assertTrue(os.path.exists(output_dir))
|
||||
|
||||
error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE)
|
||||
self.assertTrue(os.path.exists(error_file))
|
||||
|
||||
error = read_file(error_file)
|
||||
self.assertEqual(error,
|
||||
"Recieved incorrectly setup job.\n\nExpected type(s) are "
|
||||
"'[typing.Dict]', got <class 'str'>")
|
||||
|
||||
# Test LocalBashConductor does not execute jobs with mangled metafile
|
||||
def testLocalBashConductorMangledMetafile(self)->None:
|
||||
lpc = LocalBashConductor(
|
||||
job_queue_dir=TEST_JOB_QUEUE,
|
||||
job_output_dir=TEST_JOB_OUTPUT
|
||||
)
|
||||
|
||||
file_path = os.path.join(TEST_MONITOR_BASE, "test")
|
||||
result_path = os.path.join(TEST_MONITOR_BASE, "output", "test")
|
||||
|
||||
with open(file_path, "w") as f:
|
||||
f.write("150")
|
||||
|
||||
file_hash = get_file_hash(file_path, SHA256)
|
||||
|
||||
pattern = FileEventPattern(
|
||||
"pattern",
|
||||
file_path,
|
||||
"recipe_one",
|
||||
"infile",
|
||||
parameters={
|
||||
"num":450,
|
||||
"outfile":result_path
|
||||
})
|
||||
recipe = BashRecipe(
|
||||
"recipe_one", COMPLETE_BASH_SCRIPT)
|
||||
|
||||
rule = create_rule(pattern, recipe)
|
||||
|
||||
params_dict = {
|
||||
"num":450,
|
||||
"infile":file_path,
|
||||
"outfile":result_path
|
||||
}
|
||||
|
||||
job_dict = create_job(
|
||||
JOB_TYPE_PAPERMILL,
|
||||
create_watchdog_event(
|
||||
file_path,
|
||||
rule,
|
||||
TEST_MONITOR_BASE,
|
||||
file_hash
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS: params_dict
|
||||
}
|
||||
)
|
||||
|
||||
job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID])
|
||||
make_dir(job_dir)
|
||||
|
||||
meta_path = os.path.join(job_dir, META_FILE)
|
||||
write_yaml({
|
||||
"This": "is",
|
||||
"a": "dictionary",
|
||||
"but": "not",
|
||||
"valid": "job",
|
||||
"definitons": "file"
|
||||
}, meta_path)
|
||||
|
||||
lpc.execute(job_dir)
|
||||
|
||||
output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID])
|
||||
|
||||
self.assertFalse(os.path.exists(job_dir))
|
||||
self.assertTrue(os.path.exists(output_dir))
|
||||
|
||||
error_file = os.path.join(output_dir, BACKUP_JOB_ERROR_FILE)
|
||||
self.assertTrue(os.path.exists(error_file))
|
||||
|
||||
error = read_file(error_file)
|
||||
self.assertEqual(error,
|
||||
"Recieved incorrectly setup job.\n\n\"Job require key "
|
||||
"'job_type'\"")
|
||||
|
||||
# Test execute criteria function
|
||||
def testValidExecuteCriteria(self)->None:
|
||||
lpc = LocalBashConductor()
|
||||
|
||||
pattern_bash = FileEventPattern(
|
||||
"pattern_bash", "A", "recipe_bash", "file_one")
|
||||
recipe_bash = BashRecipe(
|
||||
"recipe_bash", BAREBONES_BASH_SCRIPT
|
||||
)
|
||||
|
||||
bash_rule = create_rule(pattern_bash, recipe_bash)
|
||||
|
||||
status, _ = lpc.valid_execute_criteria({})
|
||||
self.assertFalse(status)
|
||||
|
||||
status, _ = lpc.valid_execute_criteria("")
|
||||
self.assertFalse(status)
|
||||
|
||||
status, _ = lpc.valid_execute_criteria({
|
||||
JOB_ID: "path",
|
||||
JOB_EVENT: "type",
|
||||
JOB_TYPE: "rule",
|
||||
JOB_PATTERN: "pattern",
|
||||
JOB_RECIPE: "recipe",
|
||||
JOB_RULE: "rule",
|
||||
JOB_STATUS: "status",
|
||||
JOB_CREATE_TIME: "create",
|
||||
JOB_REQUIREMENTS: "requirements"
|
||||
})
|
||||
self.assertFalse(status)
|
||||
|
||||
status, s = lpc.valid_execute_criteria({
|
||||
JOB_ID: "path",
|
||||
JOB_EVENT: {
|
||||
EVENT_PATH: "path",
|
||||
EVENT_TYPE: EVENT_TYPE_WATCHDOG,
|
||||
EVENT_RULE: bash_rule
|
||||
},
|
||||
JOB_TYPE: "type",
|
||||
JOB_PATTERN: bash_rule.pattern.name,
|
||||
JOB_RECIPE: bash_rule.recipe.name,
|
||||
JOB_RULE: bash_rule.name,
|
||||
JOB_STATUS: "status",
|
||||
JOB_CREATE_TIME: datetime.now(),
|
||||
JOB_REQUIREMENTS: bash_rule.recipe.requirements
|
||||
})
|
||||
self.assertFalse(status)
|
||||
|
||||
status, s = lpc.valid_execute_criteria({
|
||||
JOB_ID: "path",
|
||||
JOB_EVENT: {
|
||||
EVENT_PATH: "path",
|
||||
EVENT_TYPE: EVENT_TYPE_WATCHDOG,
|
||||
EVENT_RULE: bash_rule
|
||||
},
|
||||
JOB_TYPE: JOB_TYPE_BASH,
|
||||
JOB_PATTERN: bash_rule.pattern.name,
|
||||
JOB_RECIPE: bash_rule.recipe.name,
|
||||
JOB_RULE: bash_rule.name,
|
||||
JOB_STATUS: "status",
|
||||
JOB_CREATE_TIME: datetime.now(),
|
||||
JOB_REQUIREMENTS: bash_rule.recipe.requirements
|
||||
})
|
||||
self.assertTrue(status)
|
||||
|
||||
# TODO test job status funcs
|
||||
|
@ -14,7 +14,7 @@ from typing import Dict
|
||||
from meow_base.core.rule import Rule
|
||||
from meow_base.core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \
|
||||
SHA256, EVENT_TYPE, EVENT_PATH, EVENT_TYPE_WATCHDOG, \
|
||||
WATCHDOG_BASE, WATCHDOG_HASH, EVENT_RULE, JOB_PARAMETERS, JOB_HASH, \
|
||||
WATCHDOG_BASE, WATCHDOG_HASH, EVENT_RULE, JOB_PARAMETERS, \
|
||||
PYTHON_FUNC, JOB_ID, JOB_EVENT, \
|
||||
JOB_TYPE, JOB_PATTERN, JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, \
|
||||
JOB_REQUIREMENTS, STATUS_QUEUED, JOB_TYPE_PAPERMILL
|
||||
@ -443,7 +443,6 @@ class MeowTests(unittest.TestCase):
|
||||
"infile":"file_path",
|
||||
"outfile":"result_path"
|
||||
},
|
||||
JOB_HASH: "file_hash",
|
||||
PYTHON_FUNC:max
|
||||
}
|
||||
)
|
||||
|
@ -11,7 +11,7 @@ from typing import Dict
|
||||
from meow_base.core.correctness.meow import valid_job
|
||||
from meow_base.core.correctness.vars import EVENT_TYPE, WATCHDOG_BASE, \
|
||||
EVENT_RULE, EVENT_TYPE_WATCHDOG, EVENT_PATH, SHA256, WATCHDOG_HASH, \
|
||||
JOB_ID, JOB_TYPE_PYTHON, JOB_PARAMETERS, JOB_HASH, PYTHON_FUNC, \
|
||||
JOB_ID, JOB_TYPE_PYTHON, JOB_PARAMETERS, PYTHON_FUNC, \
|
||||
JOB_STATUS, META_FILE, JOB_ERROR, PARAMS_FILE, SWEEP_STOP, SWEEP_JUMP, \
|
||||
SWEEP_START, JOB_TYPE_PAPERMILL, JOB_TYPE_BASH, \
|
||||
get_base_file, get_job_file, get_result_file
|
||||
@ -378,7 +378,6 @@ class PapermillHandlerTests(unittest.TestCase):
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS:params_dict,
|
||||
JOB_HASH: file_hash,
|
||||
PYTHON_FUNC:papermill_job_func
|
||||
}
|
||||
)
|
||||
@ -797,7 +796,6 @@ class PythonHandlerTests(unittest.TestCase):
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS:params_dict,
|
||||
JOB_HASH: file_hash,
|
||||
PYTHON_FUNC:python_job_func
|
||||
}
|
||||
)
|
||||
@ -1210,8 +1208,7 @@ class BashHandlerTests(unittest.TestCase):
|
||||
file_hash
|
||||
),
|
||||
extras={
|
||||
JOB_PARAMETERS:params_dict,
|
||||
JOB_HASH: file_hash
|
||||
JOB_PARAMETERS:params_dict
|
||||
}
|
||||
)
|
||||
|
||||
@ -1235,7 +1232,6 @@ class BashHandlerTests(unittest.TestCase):
|
||||
st = os.stat(job_file)
|
||||
os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
||||
|
||||
|
||||
print(os.listdir(job_dir))
|
||||
print(os.getcwd())
|
||||
|
||||
@ -1265,16 +1261,6 @@ class BashHandlerTests(unittest.TestCase):
|
||||
|
||||
self.assertEqual(result, "124937\n")
|
||||
|
||||
# Test jobFunc doesn't execute with no args
|
||||
def testJobFuncBadArgs(self)->None:
|
||||
try:
|
||||
Bash_job_func({})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self.assertEqual(len(os.listdir(TEST_JOB_QUEUE)), 0)
|
||||
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0)
|
||||
|
||||
# Test handling criteria function
|
||||
def testValidHandleCriteria(self)->None:
|
||||
ph = BashHandler()
|
||||
|
Reference in New Issue
Block a user