added a few more tests, most notably of python execution

This commit is contained in:
PatchOfScotland
2023-02-03 16:07:09 +01:00
parent bc7a043225
commit 9435d500db
6 changed files with 246 additions and 21 deletions

View File

@ -6,15 +6,18 @@ import unittest
from time import sleep
from conductors import LocalPythonConductor
from core.correctness.vars import get_result_file, JOB_TYPE_PAPERMILL
from core.functionality import make_dir, read_notebook
from core.correctness.vars import get_result_file, \
JOB_TYPE_PAPERMILL, JOB_ERROR, META_FILE, JOB_TYPE_PYTHON, JOB_CREATE_TIME
from core.functionality import make_dir, read_notebook, read_yaml, read_file
from core.meow import BaseMonitor, BaseHandler, BaseConductor
from core.runner import MeowRunner
from patterns.file_event_pattern import WatchdogMonitor, FileEventPattern
from recipes.jupyter_notebook_recipe import PapermillHandler, \
JupyterNotebookRecipe
from shared import setup, teardown, TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \
TEST_MONITOR_BASE, APPENDING_NOTEBOOK
from recipes.python_recipe import PythonHandler, PythonRecipe
from shared import setup, teardown, \
TEST_HANDLER_BASE, TEST_JOB_OUTPUT, TEST_MONITOR_BASE, \
APPENDING_NOTEBOOK, COMPLETE_PYTHON_SCRIPT
class MeowTests(unittest.TestCase):
@ -115,8 +118,8 @@ class MeowTests(unittest.TestCase):
for conductor in runner.conductors:
self.assertIsInstance(conductor, BaseConductor)
# Test single meow job execution
def testMeowRunnerExecution(self)->None:
# Test single meow papermill job execution
def testMeowRunnerPapermillExecution(self)->None:
pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile",
parameters={
@ -199,8 +202,8 @@ class MeowTests(unittest.TestCase):
self.assertEqual(data, "Initial Data\nA line from a test Pattern")
# Test meow job chaining within runner
def testMeowRunnerLinkedExecution(self)->None:
# Test meow papermill job chaining within runner
def testMeowRunnerLinkedPapermillExecution(self)->None:
pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile",
parameters={
@ -308,6 +311,224 @@ class MeowTests(unittest.TestCase):
self.assertEqual(data,
"Initial Data\nA line from Pattern 1\nA line from Pattern 2")
# Test single meow python job execution
def testMeowRunnerPythonExecution(self)->None:
pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile",
parameters={
"num":10000,
"outfile":"{VGRID}/output/{FILENAME}"
})
recipe = PythonRecipe(
"recipe_one", COMPLETE_PYTHON_SCRIPT
)
patterns = {
pattern_one.name: pattern_one,
}
recipes = {
recipe.name: recipe,
}
runner_debug_stream = io.StringIO("")
runner = MeowRunner(
WatchdogMonitor(
TEST_MONITOR_BASE,
patterns,
recipes,
settletime=1
),
PythonHandler(
TEST_HANDLER_BASE,
TEST_JOB_OUTPUT,
),
LocalPythonConductor(),
print=runner_debug_stream,
logging=3
)
runner.start()
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
make_dir(start_dir)
self.assertTrue(start_dir)
with open(os.path.join(start_dir, "A.txt"), "w") as f:
f.write("25000")
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
loops = 0
job_id = None
while loops < 15:
sleep(1)
runner_debug_stream.seek(0)
messages = runner_debug_stream.readlines()
for msg in messages:
self.assertNotIn("ERROR", msg)
if "INFO: Completed execution for job: '" in msg:
job_id = msg.replace(
"INFO: Completed execution for job: '", "")
job_id = job_id[:-2]
loops = 15
loops += 1
self.assertIsNotNone(job_id)
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1)
self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT))
runner.stop()
job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
print(os.listdir(job_dir))
metafile = os.path.join(job_dir, META_FILE)
status = read_yaml(metafile)
self.assertNotIn(JOB_ERROR, status)
result_path = os.path.join(job_dir, get_result_file(JOB_TYPE_PYTHON))
self.assertTrue(os.path.exists(result_path))
result = read_file(os.path.join(result_path))
self.assertEqual(
result, "--STDOUT--\n12505000.0\ndone\n\n\n--STDERR--\n\n")
output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
self.assertTrue(os.path.exists(output_path))
output = read_file(os.path.join(output_path))
self.assertEqual(output, "12505000.0")
# Test meow python job chaining within runner
def testMeowRunnerLinkedPythonExecution(self)->None:
pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile",
parameters={
"num":250,
"outfile":"{VGRID}/middle/{FILENAME}"
})
pattern_two = FileEventPattern(
"pattern_two", "middle/A.txt", "recipe_one", "infile",
parameters={
"num":40,
"outfile":"{VGRID}/output/{FILENAME}"
})
recipe = PythonRecipe(
"recipe_one", COMPLETE_PYTHON_SCRIPT
)
patterns = {
pattern_one.name: pattern_one,
pattern_two.name: pattern_two,
}
recipes = {
recipe.name: recipe,
}
runner_debug_stream = io.StringIO("")
runner = MeowRunner(
WatchdogMonitor(
TEST_MONITOR_BASE,
patterns,
recipes,
settletime=1
),
PythonHandler(
TEST_HANDLER_BASE,
TEST_JOB_OUTPUT
),
LocalPythonConductor(),
print=runner_debug_stream,
logging=3
)
runner.start()
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
make_dir(start_dir)
self.assertTrue(start_dir)
with open(os.path.join(start_dir, "A.txt"), "w") as f:
f.write("100")
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
loops = 0
job_ids = []
while len(job_ids) < 2 and loops < 15:
sleep(1)
runner_debug_stream.seek(0)
messages = runner_debug_stream.readlines()
for msg in messages:
self.assertNotIn("ERROR", msg)
if "INFO: Completed execution for job: '" in msg:
job_id = msg.replace(
"INFO: Completed execution for job: '", "")
job_id = job_id[:-2]
if job_id not in job_ids:
job_ids.append(job_id)
loops += 1
runner.stop()
self.assertEqual(len(job_ids), 2)
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 2)
self.assertIn(job_ids[0], os.listdir(TEST_JOB_OUTPUT))
self.assertIn(job_ids[1], os.listdir(TEST_JOB_OUTPUT))
meta0 = os.path.join(TEST_JOB_OUTPUT, job_ids[0], META_FILE)
status0 = read_yaml(meta0)
create0 = status0[JOB_CREATE_TIME]
meta1 = os.path.join(TEST_JOB_OUTPUT, job_ids[1], META_FILE)
status1 = read_yaml(meta1)
create1 = status1[JOB_CREATE_TIME]
if create0 < create1:
mid_job_id = job_ids[0]
final_job_id = job_ids[1]
else:
mid_job_id = job_ids[1]
final_job_id = job_ids[0]
mid_job_dir = os.path.join(TEST_JOB_OUTPUT, mid_job_id)
self.assertEqual(len(os.listdir(mid_job_dir)), 5)
mid_metafile = os.path.join(mid_job_dir, META_FILE)
mid_status = read_yaml(mid_metafile)
self.assertNotIn(JOB_ERROR, mid_status)
mid_result_path = os.path.join(
mid_job_dir, get_result_file(JOB_TYPE_PYTHON))
self.assertTrue(os.path.exists(mid_result_path))
mid_result = read_file(os.path.join(mid_result_path))
self.assertEqual(
mid_result, "--STDOUT--\n7806.25\ndone\n\n\n--STDERR--\n\n")
mid_output_path = os.path.join(TEST_MONITOR_BASE, "middle", "A.txt")
self.assertTrue(os.path.exists(mid_output_path))
mid_output = read_file(os.path.join(mid_output_path))
self.assertEqual(mid_output, "7806.25")
final_job_dir = os.path.join(TEST_JOB_OUTPUT, final_job_id)
self.assertEqual(len(os.listdir(final_job_dir)), 5)
final_metafile = os.path.join(final_job_dir, META_FILE)
final_status = read_yaml(final_metafile)
self.assertNotIn(JOB_ERROR, final_status)
final_result_path = os.path.join(final_job_dir, get_result_file(JOB_TYPE_PYTHON))
self.assertTrue(os.path.exists(final_result_path))
final_result = read_file(os.path.join(final_result_path))
self.assertEqual(
final_result, "--STDOUT--\n2146.5625\ndone\n\n\n--STDERR--\n\n")
final_output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
self.assertTrue(os.path.exists(final_output_path))
final_output = read_file(os.path.join(final_output_path))
self.assertEqual(final_output, "2146.5625")
# TODO sweep execution test
# TODO adding tests with numpy or other external dependency
# TODO test getting job cannot handle