diff --git a/core/meow.py b/core/meow.py index 9fba97b..9bc68a3 100644 --- a/core/meow.py +++ b/core/meow.py @@ -1,18 +1,15 @@ import inspect import sys -import threading -from multiprocessing import Pipe -from random import randrange from typing import Any, Union from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ VALID_PATTERN_NAME_CHARS, VALID_RULE_NAME_CHARS, VALID_CHANNELS, \ - get_drt_imp_msg, DEBUG_WARNING, DEBUG_INFO, EVENT_TYPE + get_drt_imp_msg from core.correctness.validation import valid_string, check_type, \ - check_implementation, valid_list, valid_dict, setup_debugging -from core.functionality import print_debug, wait, generate_id + check_implementation, valid_list, valid_dict +from core.functionality import generate_id class BaseRecipe: @@ -176,125 +173,6 @@ class BaseHandler: pass -class MeowRunner: - monitors:list[BaseMonitor] - handlers:dict[str:BaseHandler] - from_monitor: list[VALID_CHANNELS] - def __init__(self, monitors:Union[BaseMonitor,list[BaseMonitor]], - handlers:Union[BaseHandler,list[BaseHandler]], - print:Any=sys.stdout, logging:int=0) -> None: - self._is_valid_handlers(handlers) - if not type(handlers) == list: - handlers = [handlers] - self.handlers = {} - for handler in handlers: - handler_events = handler.valid_event_types() - for event in handler_events: - if event in self.handlers.keys(): - self.handlers[event].append(handler) - else: - self.handlers[event] = [handler] - - self._is_valid_monitors(monitors) - if not type(monitors) == list: - monitors = [monitors] - self.monitors = monitors - self.from_monitors = [] - for monitor in self.monitors: - monitor_to_runner_reader, monitor_to_runner_writer = Pipe() - monitor.to_runner = monitor_to_runner_writer - self.from_monitors.append(monitor_to_runner_reader) - - self._stop_pipe = Pipe() - self._worker = None - self._print_target, self.debug_level = setup_debugging(print, logging) - - def run(self)->None: - all_inputs = self.from_monitors + [self._stop_pipe[0]] - while True: - ready = wait(all_inputs) - - if self._stop_pipe[0] in ready: - return - else: - for from_monitor in self.from_monitors: - if from_monitor in ready: - message = from_monitor.recv() - event = message - if not self.handlers[event[EVENT_TYPE]]: - print_debug(self._print_target, self.debug_level, - "Could not process event as no relevent " - f"handler for '{EVENT_TYPE}'", DEBUG_INFO) - return - if len(self.handlers[event[EVENT_TYPE]]) == 1: - self.handlers[event[EVENT_TYPE]][0].handle(event) - else: - self.handlers[event[EVENT_TYPE]][ - randrange(len(self.handlers[event[EVENT_TYPE]])) - ].handle(event) - - def start(self)->None: - for monitor in self.monitors: - monitor.start() - startable = [] - for handler_list in self.handlers.values(): - for handler in handler_list: - if hasattr(handler, "start") and handler not in startable: - startable.append() - for handler in startable: - handler.start() - - if self._worker is None: - self._worker = threading.Thread( - target=self.run, - args=[]) - self._worker.daemon = True - self._worker.start() - print_debug(self._print_target, self.debug_level, - "Starting MeowRunner run...", DEBUG_INFO) - else: - msg = "Repeated calls to start have no effect." - print_debug(self._print_target, self.debug_level, - msg, DEBUG_WARNING) - raise RuntimeWarning(msg) - - - def stop(self)->None: - for monitor in self.monitors: - monitor.stop() - - stopable = [] - for handler_list in self.handlers.values(): - for handler in handler_list: - if hasattr(handler, "stop") and handler not in stopable: - stopable.append() - for handler in stopable: - handler.stop() - - if self._worker is None: - msg = "Cannot stop thread that is not started." - print_debug(self._print_target, self.debug_level, - msg, DEBUG_WARNING) - raise RuntimeWarning(msg) - else: - self._stop_pipe[1].send(1) - self._worker.join() - print_debug(self._print_target, self.debug_level, - "Worker thread stopped", DEBUG_INFO) - - - def _is_valid_monitors(self, - monitors:Union[BaseMonitor,list[BaseMonitor]])->None: - check_type(monitors, BaseMonitor, alt_types=[list[BaseMonitor]]) - if type(monitors) == list: - valid_list(monitors, BaseMonitor, min_length=1) - - def _is_valid_handlers(self, - handlers:Union[BaseHandler,list[BaseHandler]])->None: - check_type(handlers, BaseHandler, alt_types=[list[BaseHandler]]) - if type(handlers) == list: - valid_list(handlers, BaseHandler, min_length=1) - def create_rules(patterns:Union[dict[str,BasePattern],list[BasePattern]], recipes:Union[dict[str,BaseRecipe],list[BaseRecipe]], new_rules:list[BaseRule]=[])->dict[str,BaseRule]: diff --git a/core/runner.py b/core/runner.py new file mode 100644 index 0000000..905bcb3 --- /dev/null +++ b/core/runner.py @@ -0,0 +1,134 @@ + +import sys +import threading + +from multiprocessing import Pipe +from random import randrange +from typing import Any, Union + +from core.correctness.vars import DEBUG_WARNING, DEBUG_INFO, EVENT_TYPE, \ + VALID_CHANNELS +from core.correctness.validation import setup_debugging, check_type, \ + valid_list +from core.functionality import print_debug, wait +from core.meow import BaseHandler, BaseMonitor + + +class MeowRunner: + monitors:list[BaseMonitor] + handlers:dict[str:BaseHandler] + from_monitor: list[VALID_CHANNELS] + def __init__(self, monitors:Union[BaseMonitor,list[BaseMonitor]], + handlers:Union[BaseHandler,list[BaseHandler]], + print:Any=sys.stdout, logging:int=0) -> None: + self._is_valid_handlers(handlers) + if not type(handlers) == list: + handlers = [handlers] + self.handlers = {} + for handler in handlers: + handler_events = handler.valid_event_types() + for event in handler_events: + if event in self.handlers.keys(): + self.handlers[event].append(handler) + else: + self.handlers[event] = [handler] + + self._is_valid_monitors(monitors) + if not type(monitors) == list: + monitors = [monitors] + self.monitors = monitors + self.from_monitors = [] + for monitor in self.monitors: + monitor_to_runner_reader, monitor_to_runner_writer = Pipe() + monitor.to_runner = monitor_to_runner_writer + self.from_monitors.append(monitor_to_runner_reader) + + self._stop_pipe = Pipe() + self._worker = None + self._print_target, self.debug_level = setup_debugging(print, logging) + + def run(self)->None: + all_inputs = self.from_monitors + [self._stop_pipe[0]] + while True: + ready = wait(all_inputs) + + if self._stop_pipe[0] in ready: + return + else: + for from_monitor in self.from_monitors: + if from_monitor in ready: + message = from_monitor.recv() + event = message + if not self.handlers[event[EVENT_TYPE]]: + print_debug(self._print_target, self.debug_level, + "Could not process event as no relevent " + f"handler for '{EVENT_TYPE}'", DEBUG_INFO) + return + if len(self.handlers[event[EVENT_TYPE]]) == 1: + self.handlers[event[EVENT_TYPE]][0].handle(event) + else: + self.handlers[event[EVENT_TYPE]][ + randrange(len(self.handlers[event[EVENT_TYPE]])) + ].handle(event) + + def start(self)->None: + for monitor in self.monitors: + monitor.start() + startable = [] + for handler_list in self.handlers.values(): + for handler in handler_list: + if hasattr(handler, "start") and handler not in startable: + startable.append() + for handler in startable: + handler.start() + + if self._worker is None: + self._worker = threading.Thread( + target=self.run, + args=[]) + self._worker.daemon = True + self._worker.start() + print_debug(self._print_target, self.debug_level, + "Starting MeowRunner run...", DEBUG_INFO) + else: + msg = "Repeated calls to start have no effect." + print_debug(self._print_target, self.debug_level, + msg, DEBUG_WARNING) + raise RuntimeWarning(msg) + + + def stop(self)->None: + for monitor in self.monitors: + monitor.stop() + + stopable = [] + for handler_list in self.handlers.values(): + for handler in handler_list: + if hasattr(handler, "stop") and handler not in stopable: + stopable.append() + for handler in stopable: + handler.stop() + + if self._worker is None: + msg = "Cannot stop thread that is not started." + print_debug(self._print_target, self.debug_level, + msg, DEBUG_WARNING) + raise RuntimeWarning(msg) + else: + self._stop_pipe[1].send(1) + self._worker.join() + print_debug(self._print_target, self.debug_level, + "Worker thread stopped", DEBUG_INFO) + + + def _is_valid_monitors(self, + monitors:Union[BaseMonitor,list[BaseMonitor]])->None: + check_type(monitors, BaseMonitor, alt_types=[list[BaseMonitor]]) + if type(monitors) == list: + valid_list(monitors, BaseMonitor, min_length=1) + + def _is_valid_handlers(self, + handlers:Union[BaseHandler,list[BaseHandler]])->None: + check_type(handlers, BaseHandler, alt_types=[list[BaseHandler]]) + if type(handlers) == list: + valid_list(handlers, BaseHandler, min_length=1) diff --git a/tests/test_meow.py b/tests/test_meow.py index bb961d8..a31ee49 100644 --- a/tests/test_meow.py +++ b/tests/test_meow.py @@ -1,19 +1,15 @@ -import io -import os import unittest -from time import sleep from typing import Any from core.correctness.vars import TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \ - TEST_MONITOR_BASE, APPENDING_NOTEBOOK, BAREBONES_NOTEBOOK -from core.functionality import make_dir, rmtree, read_notebook + TEST_MONITOR_BASE, BAREBONES_NOTEBOOK +from core.functionality import make_dir, rmtree from core.meow import BasePattern, BaseRecipe, BaseRule, BaseMonitor, \ - BaseHandler, MeowRunner, create_rules -from patterns import WatchdogMonitor, FileEventPattern -from recipes.jupyter_notebook_recipe import PapermillHandler, \ - JupyterNotebookRecipe, RESULT_FILE + BaseHandler, create_rules +from patterns import FileEventPattern +from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe valid_pattern_one = FileEventPattern( "pattern_one", "path_one", "recipe_one", "file_one") @@ -172,196 +168,3 @@ class MeowTests(unittest.TestCase): def valid_event_types(self)->list[str]: pass FullTestHandler() - - def testMeowRunner(self)->None: - pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", - parameters={ - "extra":"A line from a test Pattern", - "outfile":"{VGRID}/output/{FILENAME}" - }) - recipe = JupyterNotebookRecipe( - "recipe_one", APPENDING_NOTEBOOK) - - patterns = { - pattern_one.name: pattern_one, - } - recipes = { - recipe.name: recipe, - } - rules = create_rules(patterns, recipes) - - monitor_debug_stream = io.StringIO("") - handler_debug_stream = io.StringIO("") - - runner = MeowRunner( - WatchdogMonitor( - TEST_MONITOR_BASE, - rules, - print=monitor_debug_stream, - logging=3, - settletime=1 - ), - PapermillHandler( - TEST_HANDLER_BASE, - TEST_JOB_OUTPUT, - print=handler_debug_stream, - logging=3 - ) - ) - - runner.start() - - start_dir = os.path.join(TEST_MONITOR_BASE, "start") - make_dir(start_dir) - self.assertTrue(start_dir) - with open(os.path.join(start_dir, "A.txt"), "w") as f: - f.write("Initial Data") - - self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) - - loops = 0 - job_id = None - while loops < 15: - sleep(1) - handler_debug_stream.seek(0) - messages = handler_debug_stream.readlines() - - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed job " in msg: - job_id = msg.replace("INFO: Completed job ", "") - job_id = job_id[:job_id.index(" with output")] - loops = 15 - loops += 1 - - self.assertIsNotNone(job_id) - self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1) - self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT)) - - runner.stop() - - job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) - self.assertEqual(len(os.listdir(job_dir)), 5) - - result = read_notebook(os.path.join(job_dir, RESULT_FILE)) - self.assertIsNotNone(result) - - output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") - self.assertTrue(os.path.exists(output_path)) - - with open(output_path, "r") as f: - data = f.read() - - self.assertEqual(data, "Initial Data\nA line from a test Pattern") - - def testMeowRunnerLinkeExecution(self)->None: - pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", - parameters={ - "extra":"A line from Pattern 1", - "outfile":"{VGRID}/middle/{FILENAME}" - }) - pattern_two = FileEventPattern( - "pattern_two", "middle/A.txt", "recipe_one", "infile", - parameters={ - "extra":"A line from Pattern 2", - "outfile":"{VGRID}/output/{FILENAME}" - }) - recipe = JupyterNotebookRecipe( - "recipe_one", APPENDING_NOTEBOOK) - - patterns = { - pattern_one.name: pattern_one, - pattern_two.name: pattern_two, - } - recipes = { - recipe.name: recipe, - } - rules = create_rules(patterns, recipes) - - monitor_debug_stream = io.StringIO("") - handler_debug_stream = io.StringIO("") - - runner = MeowRunner( - WatchdogMonitor( - TEST_MONITOR_BASE, - rules, - print=monitor_debug_stream, - logging=3, - settletime=1 - ), - PapermillHandler( - TEST_HANDLER_BASE, - TEST_JOB_OUTPUT, - print=handler_debug_stream, - logging=3 - ) - ) - - runner.start() - - start_dir = os.path.join(TEST_MONITOR_BASE, "start") - make_dir(start_dir) - self.assertTrue(start_dir) - with open(os.path.join(start_dir, "A.txt"), "w") as f: - f.write("Initial Data") - - self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) - - loops = 0 - job_ids = [] - while len(job_ids) < 2 and loops < 15: - sleep(1) - handler_debug_stream.seek(0) - messages = handler_debug_stream.readlines() - - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed job " in msg: - job_id = msg.replace("INFO: Completed job ", "") - job_id = job_id[:job_id.index(" with output")] - if job_id not in job_ids: - job_ids.append(job_id) - loops += 1 - - print(job_ids) - - self.assertEqual(len(job_ids), 2) - self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 2) - self.assertIn(job_ids[0], os.listdir(TEST_JOB_OUTPUT)) - self.assertIn(job_ids[1], os.listdir(TEST_JOB_OUTPUT)) - - runner.stop() - - mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) - self.assertEqual(len(os.listdir(mid_job_dir)), 5) - - result = read_notebook(os.path.join(mid_job_dir, RESULT_FILE)) - self.assertIsNotNone(result) - - mid_output_path = os.path.join(TEST_MONITOR_BASE, "middle", "A.txt") - self.assertTrue(os.path.exists(mid_output_path)) - - with open(mid_output_path, "r") as f: - data = f.read() - - self.assertEqual(data, "Initial Data\nA line from Pattern 1") - - final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) - self.assertEqual(len(os.listdir(final_job_dir)), 5) - - result = read_notebook(os.path.join(final_job_dir, RESULT_FILE)) - self.assertIsNotNone(result) - - final_output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") - self.assertTrue(os.path.exists(final_output_path)) - - with open(final_output_path, "r") as f: - data = f.read() - - self.assertEqual(data, - "Initial Data\nA line from Pattern 1\nA line from Pattern 2") - diff --git a/tests/test_runner.py b/tests/test_runner.py new file mode 100644 index 0000000..3b01cd7 --- /dev/null +++ b/tests/test_runner.py @@ -0,0 +1,222 @@ + +import io +import os +import unittest + +from time import sleep + +from core.correctness.vars import TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \ + TEST_MONITOR_BASE, APPENDING_NOTEBOOK +from core.functionality import make_dir, rmtree, read_notebook +from core.meow import create_rules +from core.runner import MeowRunner +from patterns import WatchdogMonitor, FileEventPattern +from recipes.jupyter_notebook_recipe import PapermillHandler, \ + JupyterNotebookRecipe, RESULT_FILE + + +class MeowTests(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + make_dir(TEST_MONITOR_BASE) + make_dir(TEST_HANDLER_BASE) + make_dir(TEST_JOB_OUTPUT) + + def tearDown(self) -> None: + super().tearDown() + rmtree(TEST_MONITOR_BASE) + rmtree(TEST_HANDLER_BASE) + rmtree(TEST_JOB_OUTPUT) + + def testMeowRunner(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":"{VGRID}/output/{FILENAME}" + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, + } + rules = create_rules(patterns, recipes) + + monitor_debug_stream = io.StringIO("") + handler_debug_stream = io.StringIO("") + + runner = MeowRunner( + WatchdogMonitor( + TEST_MONITOR_BASE, + rules, + print=monitor_debug_stream, + logging=3, + settletime=1 + ), + PapermillHandler( + TEST_HANDLER_BASE, + TEST_JOB_OUTPUT, + print=handler_debug_stream, + logging=3 + ) + ) + + runner.start() + + start_dir = os.path.join(TEST_MONITOR_BASE, "start") + make_dir(start_dir) + self.assertTrue(start_dir) + with open(os.path.join(start_dir, "A.txt"), "w") as f: + f.write("Initial Data") + + self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) + + loops = 0 + job_id = None + while loops < 15: + sleep(1) + handler_debug_stream.seek(0) + messages = handler_debug_stream.readlines() + + for msg in messages: + self.assertNotIn("ERROR", msg) + + if "INFO: Completed job " in msg: + job_id = msg.replace("INFO: Completed job ", "") + job_id = job_id[:job_id.index(" with output")] + loops = 15 + loops += 1 + + self.assertIsNotNone(job_id) + self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1) + self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT)) + + runner.stop() + + job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) + self.assertEqual(len(os.listdir(job_dir)), 5) + + result = read_notebook(os.path.join(job_dir, RESULT_FILE)) + self.assertIsNotNone(result) + + output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") + self.assertTrue(os.path.exists(output_path)) + + with open(output_path, "r") as f: + data = f.read() + + self.assertEqual(data, "Initial Data\nA line from a test Pattern") + + def testMeowRunnerLinkeExecution(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={ + "extra":"A line from Pattern 1", + "outfile":"{VGRID}/middle/{FILENAME}" + }) + pattern_two = FileEventPattern( + "pattern_two", "middle/A.txt", "recipe_one", "infile", + parameters={ + "extra":"A line from Pattern 2", + "outfile":"{VGRID}/output/{FILENAME}" + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + patterns = { + pattern_one.name: pattern_one, + pattern_two.name: pattern_two, + } + recipes = { + recipe.name: recipe, + } + rules = create_rules(patterns, recipes) + + monitor_debug_stream = io.StringIO("") + handler_debug_stream = io.StringIO("") + + runner = MeowRunner( + WatchdogMonitor( + TEST_MONITOR_BASE, + rules, + print=monitor_debug_stream, + logging=3, + settletime=1 + ), + PapermillHandler( + TEST_HANDLER_BASE, + TEST_JOB_OUTPUT, + print=handler_debug_stream, + logging=3 + ) + ) + + runner.start() + + start_dir = os.path.join(TEST_MONITOR_BASE, "start") + make_dir(start_dir) + self.assertTrue(start_dir) + with open(os.path.join(start_dir, "A.txt"), "w") as f: + f.write("Initial Data") + + self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) + + loops = 0 + job_ids = [] + while len(job_ids) < 2 and loops < 15: + sleep(1) + handler_debug_stream.seek(0) + messages = handler_debug_stream.readlines() + + for msg in messages: + self.assertNotIn("ERROR", msg) + + if "INFO: Completed job " in msg: + job_id = msg.replace("INFO: Completed job ", "") + job_id = job_id[:job_id.index(" with output")] + if job_id not in job_ids: + job_ids.append(job_id) + loops += 1 + + print(job_ids) + + self.assertEqual(len(job_ids), 2) + self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 2) + self.assertIn(job_ids[0], os.listdir(TEST_JOB_OUTPUT)) + self.assertIn(job_ids[1], os.listdir(TEST_JOB_OUTPUT)) + + runner.stop() + + mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) + self.assertEqual(len(os.listdir(mid_job_dir)), 5) + + result = read_notebook(os.path.join(mid_job_dir, RESULT_FILE)) + self.assertIsNotNone(result) + + mid_output_path = os.path.join(TEST_MONITOR_BASE, "middle", "A.txt") + self.assertTrue(os.path.exists(mid_output_path)) + + with open(mid_output_path, "r") as f: + data = f.read() + + self.assertEqual(data, "Initial Data\nA line from Pattern 1") + + final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) + self.assertEqual(len(os.listdir(final_job_dir)), 5) + + result = read_notebook(os.path.join(final_job_dir, RESULT_FILE)) + self.assertIsNotNone(result) + + final_output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") + self.assertTrue(os.path.exists(final_output_path)) + + with open(final_output_path, "r") as f: + data = f.read() + + self.assertEqual(data, + "Initial Data\nA line from Pattern 1\nA line from Pattern 2") +