From 64452e3f0343a97b5596da989b4b897a21b32b5e Mon Sep 17 00:00:00 2001 From: PatchOfScotland Date: Tue, 31 Jan 2023 17:17:44 +0100 Subject: [PATCH] added support for multiple sweeps --- tests/shared.py | 88 ++++++++++++++++++ tests/test_functionality.py | 3 - tests/test_recipes.py | 150 +++++++++++++++++++++++++++++- tests/test_runner.py | 178 +----------------------------------- 4 files changed, 239 insertions(+), 180 deletions(-) diff --git a/tests/shared.py b/tests/shared.py index d4d3fca..83b51ad 100644 --- a/tests/shared.py +++ b/tests/shared.py @@ -174,3 +174,91 @@ APPENDING_NOTEBOOK = { "nbformat": 4, "nbformat_minor": 4 } +ADDING_NOTEBOOK = { + "cells": [ + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Default parameters values\n", + "# Amount to add to data\n", + "extra = 10\n", + "# Data input file location\n", + "infile = 'example_data/data_0.npy'\n", + "# Output file location\n", + "outfile = 'standard_output/data_0.npy'" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import os" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# load in dataset. Should be numpy array\n", + "data = np.load(infile)\n" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Add an amount to all the values in the array\n", + "added = data + int(float(extra))\n", + "\n", + "added" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Create output directory if it doesn't exist\n", + "output_dir_path = os.path.dirname(outfile)\n", + "\n", + "if output_dir_path:\n", + " os.makedirs(output_dir_path, exist_ok=True)\n", + "\n", + "# Save added array as new dataset\n", + "np.save(outfile, added)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.9" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 959cf54..57c50ed 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -341,8 +341,6 @@ class CorrectnessTests(unittest.TestCase): "N": 1 } - print(test_dict["A"]) - replaced = replace_keywords( test_dict, "job_id", "base/src/dir/file.ext", "base/monitor/dir") @@ -379,7 +377,6 @@ class CorrectnessTests(unittest.TestCase): with open(notebook_path, 'r') as f: data = f.readlines() - print(data) expected_bytes = [ '{"cells": [{"cell_type": "code", "execution_count": null, ' '"metadata": {}, "outputs": [], "source": ["# Default parameters ' diff --git a/tests/test_recipes.py b/tests/test_recipes.py index 29dc84b..bcc8ac6 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -13,7 +13,8 @@ from core.correctness.vars import EVENT_TYPE, WATCHDOG_BASE, WATCHDOG_RULE, \ from core.correctness.validation import valid_job from core.functionality import get_file_hash, create_job, create_event from core.meow import create_rules, create_rule -from patterns.file_event_pattern import FileEventPattern +from patterns.file_event_pattern import FileEventPattern, SWEEP_START, \ + SWEEP_STOP, SWEEP_JUMP from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ PapermillHandler, job_func from rules.file_event_jupyter_notebook_rule import FileEventJupyterNotebookRule @@ -160,6 +161,153 @@ class CorrectnessTests(unittest.TestCase): valid_job(job) + # Test PapermillHandler will create enough jobs from single sweep + def testPapermillHandlerHandlingSingleSweep(self)->None: + from_handler_reader, from_handler_writer = Pipe() + ph = PapermillHandler( + TEST_HANDLER_BASE, + TEST_JOB_OUTPUT + ) + ph.to_runner = from_handler_writer + + with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: + f.write("Data") + + pattern_one = FileEventPattern( + "pattern_one", "A", "recipe_one", "file_one", sweep={"s":{ + SWEEP_START: 0, SWEEP_STOP: 2, SWEEP_JUMP:1 + }}) + recipe = JupyterNotebookRecipe( + "recipe_one", COMPLETE_NOTEBOOK) + + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, + } + + rules = create_rules(patterns, recipes) + self.assertEqual(len(rules), 1) + _, rule = rules.popitem() + self.assertIsInstance(rule, FileEventJupyterNotebookRule) + + self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) + + event = { + EVENT_TYPE: WATCHDOG_TYPE, + EVENT_PATH: os.path.join(TEST_MONITOR_BASE, "A"), + WATCHDOG_BASE: TEST_MONITOR_BASE, + WATCHDOG_RULE: rule, + WATCHDOG_HASH: get_file_hash( + os.path.join(TEST_MONITOR_BASE, "A"), SHA256 + ) + } + + ph.handle(event) + + jobs = [] + recieving = True + while recieving: + if from_handler_reader.poll(3): + jobs.append(from_handler_reader.recv()) + else: + recieving = False + + values = [0, 1, 2] + self.assertEqual(len(jobs), 3) + for job in jobs: + valid_job(job) + self.assertIn(JOB_PARAMETERS, job) + self.assertIn("s", job[JOB_PARAMETERS]) + if job[JOB_PARAMETERS]["s"] in values: + values.remove(job[JOB_PARAMETERS]["s"]) + self.assertEqual(len(values), 0) + + # Test PapermillHandler will create enough jobs from multiple sweeps + def testPapermillHandlerHandlingMultipleSweep(self)->None: + from_handler_reader, from_handler_writer = Pipe() + ph = PapermillHandler( + TEST_HANDLER_BASE, + TEST_JOB_OUTPUT + ) + ph.to_runner = from_handler_writer + + with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: + f.write("Data") + + pattern_one = FileEventPattern( + "pattern_one", "A", "recipe_one", "file_one", sweep={ + "s1":{ + SWEEP_START: 0, SWEEP_STOP: 2, SWEEP_JUMP:1 + }, + "s2":{ + SWEEP_START: 20, SWEEP_STOP: 80, SWEEP_JUMP:15 + } + }) + recipe = JupyterNotebookRecipe( + "recipe_one", COMPLETE_NOTEBOOK) + + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, + } + + rules = create_rules(patterns, recipes) + self.assertEqual(len(rules), 1) + _, rule = rules.popitem() + self.assertIsInstance(rule, FileEventJupyterNotebookRule) + + self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) + + event = { + EVENT_TYPE: WATCHDOG_TYPE, + EVENT_PATH: os.path.join(TEST_MONITOR_BASE, "A"), + WATCHDOG_BASE: TEST_MONITOR_BASE, + WATCHDOG_RULE: rule, + WATCHDOG_HASH: get_file_hash( + os.path.join(TEST_MONITOR_BASE, "A"), SHA256 + ) + } + + ph.handle(event) + + jobs = [] + recieving = True + while recieving: + if from_handler_reader.poll(3): + jobs.append(from_handler_reader.recv()) + else: + recieving = False + + values = [ + "s1-0/s2-20", "s1-1/s2-20", "s1-2/s2-20", + "s1-0/s2-35", "s1-1/s2-35", "s1-2/s2-35", + "s1-0/s2-50", "s1-1/s2-50", "s1-2/s2-50", + "s1-0/s2-65", "s1-1/s2-65", "s1-2/s2-65", + "s1-0/s2-80", "s1-1/s2-80", "s1-2/s2-80", + ] + self.assertEqual(len(jobs), 15) + for job in jobs: + valid_job(job) + self.assertIn(JOB_PARAMETERS, job) + val1 = None + val2 = None + if "s1" in job[JOB_PARAMETERS]: + val1 = f"s1-{job[JOB_PARAMETERS]['s1']}" + if "s2" in job[JOB_PARAMETERS]: + val2 = f"s2-{job[JOB_PARAMETERS]['s2']}" + val = None + if val1 and val2: + val = f"{val1}/{val2}" + if val and val in values: + values.remove(val) + print([j[JOB_PARAMETERS] for j in jobs]) + print(values) + self.assertEqual(len(values), 0) + # Test jobFunc performs as expected def testJobFunc(self)->None: file_path = os.path.join(TEST_MONITOR_BASE, "test") diff --git a/tests/test_runner.py b/tests/test_runner.py index 480ecd3..055dcc6 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -231,9 +231,6 @@ class MeowTests(unittest.TestCase): loops = 15 loops += 1 - print("JOB ID:") - print(job_id) - self.assertIsNotNone(job_id) self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1) self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT)) @@ -324,8 +321,6 @@ class MeowTests(unittest.TestCase): if job_id not in job_ids: job_ids.append(job_id) loops += 1 - - print(job_ids) self.assertEqual(len(job_ids), 2) self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 2) @@ -363,174 +358,5 @@ class MeowTests(unittest.TestCase): self.assertEqual(data, "Initial Data\nA line from Pattern 1\nA line from Pattern 2") - # Test single swept meow job execution - def testMeowRunnerExecution(self)->None: - pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", - parameters={ - "extra":"A line from a test Pattern", - "outfile":"{VGRID}/output/{FILENAME}" - }) - recipe = JupyterNotebookRecipe( - "recipe_one", APPENDING_NOTEBOOK) - - patterns = { - pattern_one.name: pattern_one, - } - recipes = { - recipe.name: recipe, - } - - runner_debug_stream = io.StringIO("") - - runner = MeowRunner( - WatchdogMonitor( - TEST_MONITOR_BASE, - patterns, - recipes, - settletime=1 - ), - PapermillHandler( - TEST_HANDLER_BASE, - TEST_JOB_OUTPUT, - ), - LocalPythonConductor(), - print=runner_debug_stream, - logging=3 - ) - - runner.start() - - start_dir = os.path.join(TEST_MONITOR_BASE, "start") - make_dir(start_dir) - self.assertTrue(start_dir) - with open(os.path.join(start_dir, "A.txt"), "w") as f: - f.write("Initial Data") - - self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) - - loops = 0 - job_id = None - while loops < 15: - sleep(1) - runner_debug_stream.seek(0) - messages = runner_debug_stream.readlines() - - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed execution for job: '" in msg: - job_id = msg.replace( - "INFO: Completed execution for job: '", "") - job_id = job_id[:-2] - loops = 15 - loops += 1 - - print("JOB ID:") - print(job_id) - - self.assertIsNotNone(job_id) - self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1) - self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT)) - - runner.stop() - - job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) - self.assertEqual(len(os.listdir(job_dir)), 5) - - result = read_notebook(os.path.join(job_dir, RESULT_FILE)) - self.assertIsNotNone(result) - - output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") - self.assertTrue(os.path.exists(output_path)) - - with open(output_path, "r") as f: - data = f.read() - - self.assertEqual(data, "Initial Data\nA line from a test Pattern") - - # Test multiple swept meow job execution - def testMeowRunnerExecution(self)->None: - pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", - parameters={ - "extra":"A line from a test Pattern", - "outfile":"{VGRID}/output/{FILENAME}" - }) - recipe = JupyterNotebookRecipe( - "recipe_one", APPENDING_NOTEBOOK) - - patterns = { - pattern_one.name: pattern_one, - } - recipes = { - recipe.name: recipe, - } - - runner_debug_stream = io.StringIO("") - - runner = MeowRunner( - WatchdogMonitor( - TEST_MONITOR_BASE, - patterns, - recipes, - settletime=1 - ), - PapermillHandler( - TEST_HANDLER_BASE, - TEST_JOB_OUTPUT, - ), - LocalPythonConductor(), - print=runner_debug_stream, - logging=3 - ) - - runner.start() - - start_dir = os.path.join(TEST_MONITOR_BASE, "start") - make_dir(start_dir) - self.assertTrue(start_dir) - with open(os.path.join(start_dir, "A.txt"), "w") as f: - f.write("Initial Data") - - self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) - - loops = 0 - job_id = None - while loops < 15: - sleep(1) - runner_debug_stream.seek(0) - messages = runner_debug_stream.readlines() - - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed execution for job: '" in msg: - job_id = msg.replace( - "INFO: Completed execution for job: '", "") - job_id = job_id[:-2] - loops = 15 - loops += 1 - - print("JOB ID:") - print(job_id) - - self.assertIsNotNone(job_id) - self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1) - self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT)) - - runner.stop() - - job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) - self.assertEqual(len(os.listdir(job_dir)), 5) - - result = read_notebook(os.path.join(job_dir, RESULT_FILE)) - self.assertIsNotNone(result) - - output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") - self.assertTrue(os.path.exists(output_path)) - - with open(output_path, "r") as f: - data = f.read() - - self.assertEqual(data, "Initial Data\nA line from a test Pattern") + # TODO sweep tests + # TODO adding tests with numpy