diff --git a/recipes/python_recipe.py b/recipes/python_recipe.py index 59d344b..ebccccd 100644 --- a/recipes/python_recipe.py +++ b/recipes/python_recipe.py @@ -239,17 +239,17 @@ def python_job_func(job): std_stdout = sys.stdout std_stderr = sys.stderr try: - redirected_output = sys.stdout - redirected_error = sys.stderr + redirected_output = sys.stdout = StringIO() + redirected_error = sys.stderr = StringIO() exec(open(job_file).read()) - write_file(f"""--STDOUT-- - {redirected_output} - - --STDERR-- - {redirected_error} - """, + write_file(("--STDOUT--\n" + f"{redirected_output.getvalue()}\n" + "\n" + "--STDERR--\n" + f"{redirected_error.getvalue()}\n" + ""), result_file) except Exception as e: diff --git a/tests/shared.py b/tests/shared.py index afa4f0a..d0caf50 100644 --- a/tests/shared.py +++ b/tests/shared.py @@ -27,13 +27,14 @@ BAREBONES_PYTHON_SCRIPT = [ "" ] COMPLETE_PYTHON_SCRIPT = [ + "import os", "# Setup parameters", "num = 1000", "infile = 'somehere/particular'", "outfile = 'nowhere/particular'", "", "with open(infile, 'r') as file:", - " s = int(file.read())", + " s = float(file.read())", "" "for i in range(num):", " s += i", @@ -43,8 +44,12 @@ COMPLETE_PYTHON_SCRIPT = [ "", "print(result)", "", + "os.makedirs(os.path.dirname(outfile), exist_ok=True)", + "", "with open(outfile, 'w') as file:", - " file.write(str(result))" + " file.write(str(result))", + "", + "print('done')" ] # Jupyter notebooks diff --git a/tests/test_conductors.py b/tests/test_conductors.py index 10b693e..d95ae30 100644 --- a/tests/test_conductors.py +++ b/tests/test_conductors.py @@ -37,7 +37,7 @@ class MeowTests(unittest.TestCase): def testLocalPythonConductorCreation(self)->None: LocalPythonConductor() - #TODO Test LocalPythonConductor executes valid python jobs + # Test LocalPythonConductor executes valid python jobs def testLocalPythonConductorValidPythonJob(self)->None: lpc = LocalPythonConductor() diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 97b9da8..26b5500 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -257,7 +257,7 @@ class CorrectnessTests(unittest.TestCase): COMPLETE_PYTHON_SCRIPT, {"num": 50}) self.assertNotEqual(ps, COMPLETE_PYTHON_SCRIPT) - self.assertEqual(ps[1], "num = 50") + self.assertEqual(ps[2], "num = 50") # Test that create_event produces valid event dictionary def testCreateEvent(self)->None: diff --git a/tests/test_recipes.py b/tests/test_recipes.py index dac9af3..eac2662 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -417,7 +417,6 @@ class JupyterNotebookTests(unittest.TestCase): #TODO Test handling criteria function -# TODO implement me class PythonTests(unittest.TestCase): def setUp(self)->None: super().setUp() diff --git a/tests/test_runner.py b/tests/test_runner.py index dda3a07..e5fd26a 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -6,15 +6,18 @@ import unittest from time import sleep from conductors import LocalPythonConductor -from core.correctness.vars import get_result_file, JOB_TYPE_PAPERMILL -from core.functionality import make_dir, read_notebook +from core.correctness.vars import get_result_file, \ + JOB_TYPE_PAPERMILL, JOB_ERROR, META_FILE, JOB_TYPE_PYTHON, JOB_CREATE_TIME +from core.functionality import make_dir, read_notebook, read_yaml, read_file from core.meow import BaseMonitor, BaseHandler, BaseConductor from core.runner import MeowRunner from patterns.file_event_pattern import WatchdogMonitor, FileEventPattern from recipes.jupyter_notebook_recipe import PapermillHandler, \ JupyterNotebookRecipe -from shared import setup, teardown, TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \ - TEST_MONITOR_BASE, APPENDING_NOTEBOOK +from recipes.python_recipe import PythonHandler, PythonRecipe +from shared import setup, teardown, \ + TEST_HANDLER_BASE, TEST_JOB_OUTPUT, TEST_MONITOR_BASE, \ + APPENDING_NOTEBOOK, COMPLETE_PYTHON_SCRIPT class MeowTests(unittest.TestCase): @@ -115,8 +118,8 @@ class MeowTests(unittest.TestCase): for conductor in runner.conductors: self.assertIsInstance(conductor, BaseConductor) - # Test single meow job execution - def testMeowRunnerExecution(self)->None: + # Test single meow papermill job execution + def testMeowRunnerPapermillExecution(self)->None: pattern_one = FileEventPattern( "pattern_one", "start/A.txt", "recipe_one", "infile", parameters={ @@ -199,8 +202,8 @@ class MeowTests(unittest.TestCase): self.assertEqual(data, "Initial Data\nA line from a test Pattern") - # Test meow job chaining within runner - def testMeowRunnerLinkedExecution(self)->None: + # Test meow papermill job chaining within runner + def testMeowRunnerLinkedPapermillExecution(self)->None: pattern_one = FileEventPattern( "pattern_one", "start/A.txt", "recipe_one", "infile", parameters={ @@ -308,6 +311,224 @@ class MeowTests(unittest.TestCase): self.assertEqual(data, "Initial Data\nA line from Pattern 1\nA line from Pattern 2") + # Test single meow python job execution + def testMeowRunnerPythonExecution(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={ + "num":10000, + "outfile":"{VGRID}/output/{FILENAME}" + }) + recipe = PythonRecipe( + "recipe_one", COMPLETE_PYTHON_SCRIPT + ) + + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, + } + + runner_debug_stream = io.StringIO("") + + runner = MeowRunner( + WatchdogMonitor( + TEST_MONITOR_BASE, + patterns, + recipes, + settletime=1 + ), + PythonHandler( + TEST_HANDLER_BASE, + TEST_JOB_OUTPUT, + ), + LocalPythonConductor(), + print=runner_debug_stream, + logging=3 + ) + + runner.start() + + start_dir = os.path.join(TEST_MONITOR_BASE, "start") + make_dir(start_dir) + self.assertTrue(start_dir) + with open(os.path.join(start_dir, "A.txt"), "w") as f: + f.write("25000") + + self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) + + loops = 0 + job_id = None + while loops < 15: + sleep(1) + runner_debug_stream.seek(0) + messages = runner_debug_stream.readlines() + + for msg in messages: + self.assertNotIn("ERROR", msg) + + if "INFO: Completed execution for job: '" in msg: + job_id = msg.replace( + "INFO: Completed execution for job: '", "") + job_id = job_id[:-2] + loops = 15 + loops += 1 + + self.assertIsNotNone(job_id) + self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1) + self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT)) + + runner.stop() + + job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) + print(os.listdir(job_dir)) + + metafile = os.path.join(job_dir, META_FILE) + status = read_yaml(metafile) + + self.assertNotIn(JOB_ERROR, status) + + result_path = os.path.join(job_dir, get_result_file(JOB_TYPE_PYTHON)) + self.assertTrue(os.path.exists(result_path)) + result = read_file(os.path.join(result_path)) + self.assertEqual( + result, "--STDOUT--\n12505000.0\ndone\n\n\n--STDERR--\n\n") + + output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") + self.assertTrue(os.path.exists(output_path)) + output = read_file(os.path.join(output_path)) + self.assertEqual(output, "12505000.0") + + # Test meow python job chaining within runner + def testMeowRunnerLinkedPythonExecution(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={ + "num":250, + "outfile":"{VGRID}/middle/{FILENAME}" + }) + pattern_two = FileEventPattern( + "pattern_two", "middle/A.txt", "recipe_one", "infile", + parameters={ + "num":40, + "outfile":"{VGRID}/output/{FILENAME}" + }) + recipe = PythonRecipe( + "recipe_one", COMPLETE_PYTHON_SCRIPT + ) + + patterns = { + pattern_one.name: pattern_one, + pattern_two.name: pattern_two, + } + recipes = { + recipe.name: recipe, + } + + runner_debug_stream = io.StringIO("") + + runner = MeowRunner( + WatchdogMonitor( + TEST_MONITOR_BASE, + patterns, + recipes, + settletime=1 + ), + PythonHandler( + TEST_HANDLER_BASE, + TEST_JOB_OUTPUT + ), + LocalPythonConductor(), + print=runner_debug_stream, + logging=3 + ) + + runner.start() + + start_dir = os.path.join(TEST_MONITOR_BASE, "start") + make_dir(start_dir) + self.assertTrue(start_dir) + with open(os.path.join(start_dir, "A.txt"), "w") as f: + f.write("100") + + self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) + + loops = 0 + job_ids = [] + while len(job_ids) < 2 and loops < 15: + sleep(1) + runner_debug_stream.seek(0) + messages = runner_debug_stream.readlines() + + for msg in messages: + self.assertNotIn("ERROR", msg) + + if "INFO: Completed execution for job: '" in msg: + job_id = msg.replace( + "INFO: Completed execution for job: '", "") + job_id = job_id[:-2] + if job_id not in job_ids: + job_ids.append(job_id) + loops += 1 + + runner.stop() + + self.assertEqual(len(job_ids), 2) + self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 2) + self.assertIn(job_ids[0], os.listdir(TEST_JOB_OUTPUT)) + self.assertIn(job_ids[1], os.listdir(TEST_JOB_OUTPUT)) + + meta0 = os.path.join(TEST_JOB_OUTPUT, job_ids[0], META_FILE) + status0 = read_yaml(meta0) + create0 = status0[JOB_CREATE_TIME] + meta1 = os.path.join(TEST_JOB_OUTPUT, job_ids[1], META_FILE) + status1 = read_yaml(meta1) + create1 = status1[JOB_CREATE_TIME] + if create0 < create1: + mid_job_id = job_ids[0] + final_job_id = job_ids[1] + else: + mid_job_id = job_ids[1] + final_job_id = job_ids[0] + + mid_job_dir = os.path.join(TEST_JOB_OUTPUT, mid_job_id) + self.assertEqual(len(os.listdir(mid_job_dir)), 5) + + mid_metafile = os.path.join(mid_job_dir, META_FILE) + mid_status = read_yaml(mid_metafile) + self.assertNotIn(JOB_ERROR, mid_status) + + mid_result_path = os.path.join( + mid_job_dir, get_result_file(JOB_TYPE_PYTHON)) + self.assertTrue(os.path.exists(mid_result_path)) + mid_result = read_file(os.path.join(mid_result_path)) + self.assertEqual( + mid_result, "--STDOUT--\n7806.25\ndone\n\n\n--STDERR--\n\n") + + mid_output_path = os.path.join(TEST_MONITOR_BASE, "middle", "A.txt") + self.assertTrue(os.path.exists(mid_output_path)) + mid_output = read_file(os.path.join(mid_output_path)) + self.assertEqual(mid_output, "7806.25") + + final_job_dir = os.path.join(TEST_JOB_OUTPUT, final_job_id) + self.assertEqual(len(os.listdir(final_job_dir)), 5) + + final_metafile = os.path.join(final_job_dir, META_FILE) + final_status = read_yaml(final_metafile) + self.assertNotIn(JOB_ERROR, final_status) + + final_result_path = os.path.join(final_job_dir, get_result_file(JOB_TYPE_PYTHON)) + self.assertTrue(os.path.exists(final_result_path)) + final_result = read_file(os.path.join(final_result_path)) + self.assertEqual( + final_result, "--STDOUT--\n2146.5625\ndone\n\n\n--STDERR--\n\n") + + final_output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") + self.assertTrue(os.path.exists(final_output_path)) + final_output = read_file(os.path.join(final_output_path)) + self.assertEqual(final_output, "2146.5625") + # TODO sweep execution test # TODO adding tests with numpy or other external dependency # TODO test getting job cannot handle