moved runner to own file as will be heavily expanded

This commit is contained in:
PatchOfScotland
2023-01-15 10:10:29 +01:00
parent ee81b2561e
commit eabedb4747
4 changed files with 364 additions and 327 deletions

View File

@ -1,18 +1,15 @@
import inspect import inspect
import sys import sys
import threading
from multiprocessing import Pipe
from random import randrange
from typing import Any, Union from typing import Any, Union
from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \
VALID_PATTERN_NAME_CHARS, VALID_RULE_NAME_CHARS, VALID_CHANNELS, \ VALID_PATTERN_NAME_CHARS, VALID_RULE_NAME_CHARS, VALID_CHANNELS, \
get_drt_imp_msg, DEBUG_WARNING, DEBUG_INFO, EVENT_TYPE get_drt_imp_msg
from core.correctness.validation import valid_string, check_type, \ from core.correctness.validation import valid_string, check_type, \
check_implementation, valid_list, valid_dict, setup_debugging check_implementation, valid_list, valid_dict
from core.functionality import print_debug, wait, generate_id from core.functionality import generate_id
class BaseRecipe: class BaseRecipe:
@ -176,125 +173,6 @@ class BaseHandler:
pass pass
class MeowRunner:
monitors:list[BaseMonitor]
handlers:dict[str:BaseHandler]
from_monitor: list[VALID_CHANNELS]
def __init__(self, monitors:Union[BaseMonitor,list[BaseMonitor]],
handlers:Union[BaseHandler,list[BaseHandler]],
print:Any=sys.stdout, logging:int=0) -> None:
self._is_valid_handlers(handlers)
if not type(handlers) == list:
handlers = [handlers]
self.handlers = {}
for handler in handlers:
handler_events = handler.valid_event_types()
for event in handler_events:
if event in self.handlers.keys():
self.handlers[event].append(handler)
else:
self.handlers[event] = [handler]
self._is_valid_monitors(monitors)
if not type(monitors) == list:
monitors = [monitors]
self.monitors = monitors
self.from_monitors = []
for monitor in self.monitors:
monitor_to_runner_reader, monitor_to_runner_writer = Pipe()
monitor.to_runner = monitor_to_runner_writer
self.from_monitors.append(monitor_to_runner_reader)
self._stop_pipe = Pipe()
self._worker = None
self._print_target, self.debug_level = setup_debugging(print, logging)
def run(self)->None:
all_inputs = self.from_monitors + [self._stop_pipe[0]]
while True:
ready = wait(all_inputs)
if self._stop_pipe[0] in ready:
return
else:
for from_monitor in self.from_monitors:
if from_monitor in ready:
message = from_monitor.recv()
event = message
if not self.handlers[event[EVENT_TYPE]]:
print_debug(self._print_target, self.debug_level,
"Could not process event as no relevent "
f"handler for '{EVENT_TYPE}'", DEBUG_INFO)
return
if len(self.handlers[event[EVENT_TYPE]]) == 1:
self.handlers[event[EVENT_TYPE]][0].handle(event)
else:
self.handlers[event[EVENT_TYPE]][
randrange(len(self.handlers[event[EVENT_TYPE]]))
].handle(event)
def start(self)->None:
for monitor in self.monitors:
monitor.start()
startable = []
for handler_list in self.handlers.values():
for handler in handler_list:
if hasattr(handler, "start") and handler not in startable:
startable.append()
for handler in startable:
handler.start()
if self._worker is None:
self._worker = threading.Thread(
target=self.run,
args=[])
self._worker.daemon = True
self._worker.start()
print_debug(self._print_target, self.debug_level,
"Starting MeowRunner run...", DEBUG_INFO)
else:
msg = "Repeated calls to start have no effect."
print_debug(self._print_target, self.debug_level,
msg, DEBUG_WARNING)
raise RuntimeWarning(msg)
def stop(self)->None:
for monitor in self.monitors:
monitor.stop()
stopable = []
for handler_list in self.handlers.values():
for handler in handler_list:
if hasattr(handler, "stop") and handler not in stopable:
stopable.append()
for handler in stopable:
handler.stop()
if self._worker is None:
msg = "Cannot stop thread that is not started."
print_debug(self._print_target, self.debug_level,
msg, DEBUG_WARNING)
raise RuntimeWarning(msg)
else:
self._stop_pipe[1].send(1)
self._worker.join()
print_debug(self._print_target, self.debug_level,
"Worker thread stopped", DEBUG_INFO)
def _is_valid_monitors(self,
monitors:Union[BaseMonitor,list[BaseMonitor]])->None:
check_type(monitors, BaseMonitor, alt_types=[list[BaseMonitor]])
if type(monitors) == list:
valid_list(monitors, BaseMonitor, min_length=1)
def _is_valid_handlers(self,
handlers:Union[BaseHandler,list[BaseHandler]])->None:
check_type(handlers, BaseHandler, alt_types=[list[BaseHandler]])
if type(handlers) == list:
valid_list(handlers, BaseHandler, min_length=1)
def create_rules(patterns:Union[dict[str,BasePattern],list[BasePattern]], def create_rules(patterns:Union[dict[str,BasePattern],list[BasePattern]],
recipes:Union[dict[str,BaseRecipe],list[BaseRecipe]], recipes:Union[dict[str,BaseRecipe],list[BaseRecipe]],
new_rules:list[BaseRule]=[])->dict[str,BaseRule]: new_rules:list[BaseRule]=[])->dict[str,BaseRule]:

134
core/runner.py Normal file
View File

@ -0,0 +1,134 @@
import sys
import threading
from multiprocessing import Pipe
from random import randrange
from typing import Any, Union
from core.correctness.vars import DEBUG_WARNING, DEBUG_INFO, EVENT_TYPE, \
VALID_CHANNELS
from core.correctness.validation import setup_debugging, check_type, \
valid_list
from core.functionality import print_debug, wait
from core.meow import BaseHandler, BaseMonitor
class MeowRunner:
monitors:list[BaseMonitor]
handlers:dict[str:BaseHandler]
from_monitor: list[VALID_CHANNELS]
def __init__(self, monitors:Union[BaseMonitor,list[BaseMonitor]],
handlers:Union[BaseHandler,list[BaseHandler]],
print:Any=sys.stdout, logging:int=0) -> None:
self._is_valid_handlers(handlers)
if not type(handlers) == list:
handlers = [handlers]
self.handlers = {}
for handler in handlers:
handler_events = handler.valid_event_types()
for event in handler_events:
if event in self.handlers.keys():
self.handlers[event].append(handler)
else:
self.handlers[event] = [handler]
self._is_valid_monitors(monitors)
if not type(monitors) == list:
monitors = [monitors]
self.monitors = monitors
self.from_monitors = []
for monitor in self.monitors:
monitor_to_runner_reader, monitor_to_runner_writer = Pipe()
monitor.to_runner = monitor_to_runner_writer
self.from_monitors.append(monitor_to_runner_reader)
self._stop_pipe = Pipe()
self._worker = None
self._print_target, self.debug_level = setup_debugging(print, logging)
def run(self)->None:
all_inputs = self.from_monitors + [self._stop_pipe[0]]
while True:
ready = wait(all_inputs)
if self._stop_pipe[0] in ready:
return
else:
for from_monitor in self.from_monitors:
if from_monitor in ready:
message = from_monitor.recv()
event = message
if not self.handlers[event[EVENT_TYPE]]:
print_debug(self._print_target, self.debug_level,
"Could not process event as no relevent "
f"handler for '{EVENT_TYPE}'", DEBUG_INFO)
return
if len(self.handlers[event[EVENT_TYPE]]) == 1:
self.handlers[event[EVENT_TYPE]][0].handle(event)
else:
self.handlers[event[EVENT_TYPE]][
randrange(len(self.handlers[event[EVENT_TYPE]]))
].handle(event)
def start(self)->None:
for monitor in self.monitors:
monitor.start()
startable = []
for handler_list in self.handlers.values():
for handler in handler_list:
if hasattr(handler, "start") and handler not in startable:
startable.append()
for handler in startable:
handler.start()
if self._worker is None:
self._worker = threading.Thread(
target=self.run,
args=[])
self._worker.daemon = True
self._worker.start()
print_debug(self._print_target, self.debug_level,
"Starting MeowRunner run...", DEBUG_INFO)
else:
msg = "Repeated calls to start have no effect."
print_debug(self._print_target, self.debug_level,
msg, DEBUG_WARNING)
raise RuntimeWarning(msg)
def stop(self)->None:
for monitor in self.monitors:
monitor.stop()
stopable = []
for handler_list in self.handlers.values():
for handler in handler_list:
if hasattr(handler, "stop") and handler not in stopable:
stopable.append()
for handler in stopable:
handler.stop()
if self._worker is None:
msg = "Cannot stop thread that is not started."
print_debug(self._print_target, self.debug_level,
msg, DEBUG_WARNING)
raise RuntimeWarning(msg)
else:
self._stop_pipe[1].send(1)
self._worker.join()
print_debug(self._print_target, self.debug_level,
"Worker thread stopped", DEBUG_INFO)
def _is_valid_monitors(self,
monitors:Union[BaseMonitor,list[BaseMonitor]])->None:
check_type(monitors, BaseMonitor, alt_types=[list[BaseMonitor]])
if type(monitors) == list:
valid_list(monitors, BaseMonitor, min_length=1)
def _is_valid_handlers(self,
handlers:Union[BaseHandler,list[BaseHandler]])->None:
check_type(handlers, BaseHandler, alt_types=[list[BaseHandler]])
if type(handlers) == list:
valid_list(handlers, BaseHandler, min_length=1)

View File

@ -1,19 +1,15 @@
import io
import os
import unittest import unittest
from time import sleep
from typing import Any from typing import Any
from core.correctness.vars import TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \ from core.correctness.vars import TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \
TEST_MONITOR_BASE, APPENDING_NOTEBOOK, BAREBONES_NOTEBOOK TEST_MONITOR_BASE, BAREBONES_NOTEBOOK
from core.functionality import make_dir, rmtree, read_notebook from core.functionality import make_dir, rmtree
from core.meow import BasePattern, BaseRecipe, BaseRule, BaseMonitor, \ from core.meow import BasePattern, BaseRecipe, BaseRule, BaseMonitor, \
BaseHandler, MeowRunner, create_rules BaseHandler, create_rules
from patterns import WatchdogMonitor, FileEventPattern from patterns import FileEventPattern
from recipes.jupyter_notebook_recipe import PapermillHandler, \ from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe
JupyterNotebookRecipe, RESULT_FILE
valid_pattern_one = FileEventPattern( valid_pattern_one = FileEventPattern(
"pattern_one", "path_one", "recipe_one", "file_one") "pattern_one", "path_one", "recipe_one", "file_one")
@ -172,196 +168,3 @@ class MeowTests(unittest.TestCase):
def valid_event_types(self)->list[str]: def valid_event_types(self)->list[str]:
pass pass
FullTestHandler() FullTestHandler()
def testMeowRunner(self)->None:
pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile",
parameters={
"extra":"A line from a test Pattern",
"outfile":"{VGRID}/output/{FILENAME}"
})
recipe = JupyterNotebookRecipe(
"recipe_one", APPENDING_NOTEBOOK)
patterns = {
pattern_one.name: pattern_one,
}
recipes = {
recipe.name: recipe,
}
rules = create_rules(patterns, recipes)
monitor_debug_stream = io.StringIO("")
handler_debug_stream = io.StringIO("")
runner = MeowRunner(
WatchdogMonitor(
TEST_MONITOR_BASE,
rules,
print=monitor_debug_stream,
logging=3,
settletime=1
),
PapermillHandler(
TEST_HANDLER_BASE,
TEST_JOB_OUTPUT,
print=handler_debug_stream,
logging=3
)
)
runner.start()
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
make_dir(start_dir)
self.assertTrue(start_dir)
with open(os.path.join(start_dir, "A.txt"), "w") as f:
f.write("Initial Data")
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
loops = 0
job_id = None
while loops < 15:
sleep(1)
handler_debug_stream.seek(0)
messages = handler_debug_stream.readlines()
for msg in messages:
self.assertNotIn("ERROR", msg)
if "INFO: Completed job " in msg:
job_id = msg.replace("INFO: Completed job ", "")
job_id = job_id[:job_id.index(" with output")]
loops = 15
loops += 1
self.assertIsNotNone(job_id)
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1)
self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT))
runner.stop()
job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
self.assertEqual(len(os.listdir(job_dir)), 5)
result = read_notebook(os.path.join(job_dir, RESULT_FILE))
self.assertIsNotNone(result)
output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
self.assertTrue(os.path.exists(output_path))
with open(output_path, "r") as f:
data = f.read()
self.assertEqual(data, "Initial Data\nA line from a test Pattern")
def testMeowRunnerLinkeExecution(self)->None:
pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile",
parameters={
"extra":"A line from Pattern 1",
"outfile":"{VGRID}/middle/{FILENAME}"
})
pattern_two = FileEventPattern(
"pattern_two", "middle/A.txt", "recipe_one", "infile",
parameters={
"extra":"A line from Pattern 2",
"outfile":"{VGRID}/output/{FILENAME}"
})
recipe = JupyterNotebookRecipe(
"recipe_one", APPENDING_NOTEBOOK)
patterns = {
pattern_one.name: pattern_one,
pattern_two.name: pattern_two,
}
recipes = {
recipe.name: recipe,
}
rules = create_rules(patterns, recipes)
monitor_debug_stream = io.StringIO("")
handler_debug_stream = io.StringIO("")
runner = MeowRunner(
WatchdogMonitor(
TEST_MONITOR_BASE,
rules,
print=monitor_debug_stream,
logging=3,
settletime=1
),
PapermillHandler(
TEST_HANDLER_BASE,
TEST_JOB_OUTPUT,
print=handler_debug_stream,
logging=3
)
)
runner.start()
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
make_dir(start_dir)
self.assertTrue(start_dir)
with open(os.path.join(start_dir, "A.txt"), "w") as f:
f.write("Initial Data")
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
loops = 0
job_ids = []
while len(job_ids) < 2 and loops < 15:
sleep(1)
handler_debug_stream.seek(0)
messages = handler_debug_stream.readlines()
for msg in messages:
self.assertNotIn("ERROR", msg)
if "INFO: Completed job " in msg:
job_id = msg.replace("INFO: Completed job ", "")
job_id = job_id[:job_id.index(" with output")]
if job_id not in job_ids:
job_ids.append(job_id)
loops += 1
print(job_ids)
self.assertEqual(len(job_ids), 2)
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 2)
self.assertIn(job_ids[0], os.listdir(TEST_JOB_OUTPUT))
self.assertIn(job_ids[1], os.listdir(TEST_JOB_OUTPUT))
runner.stop()
mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
self.assertEqual(len(os.listdir(mid_job_dir)), 5)
result = read_notebook(os.path.join(mid_job_dir, RESULT_FILE))
self.assertIsNotNone(result)
mid_output_path = os.path.join(TEST_MONITOR_BASE, "middle", "A.txt")
self.assertTrue(os.path.exists(mid_output_path))
with open(mid_output_path, "r") as f:
data = f.read()
self.assertEqual(data, "Initial Data\nA line from Pattern 1")
final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
self.assertEqual(len(os.listdir(final_job_dir)), 5)
result = read_notebook(os.path.join(final_job_dir, RESULT_FILE))
self.assertIsNotNone(result)
final_output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
self.assertTrue(os.path.exists(final_output_path))
with open(final_output_path, "r") as f:
data = f.read()
self.assertEqual(data,
"Initial Data\nA line from Pattern 1\nA line from Pattern 2")

222
tests/test_runner.py Normal file
View File

@ -0,0 +1,222 @@
import io
import os
import unittest
from time import sleep
from core.correctness.vars import TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \
TEST_MONITOR_BASE, APPENDING_NOTEBOOK
from core.functionality import make_dir, rmtree, read_notebook
from core.meow import create_rules
from core.runner import MeowRunner
from patterns import WatchdogMonitor, FileEventPattern
from recipes.jupyter_notebook_recipe import PapermillHandler, \
JupyterNotebookRecipe, RESULT_FILE
class MeowTests(unittest.TestCase):
def setUp(self) -> None:
super().setUp()
make_dir(TEST_MONITOR_BASE)
make_dir(TEST_HANDLER_BASE)
make_dir(TEST_JOB_OUTPUT)
def tearDown(self) -> None:
super().tearDown()
rmtree(TEST_MONITOR_BASE)
rmtree(TEST_HANDLER_BASE)
rmtree(TEST_JOB_OUTPUT)
def testMeowRunner(self)->None:
pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile",
parameters={
"extra":"A line from a test Pattern",
"outfile":"{VGRID}/output/{FILENAME}"
})
recipe = JupyterNotebookRecipe(
"recipe_one", APPENDING_NOTEBOOK)
patterns = {
pattern_one.name: pattern_one,
}
recipes = {
recipe.name: recipe,
}
rules = create_rules(patterns, recipes)
monitor_debug_stream = io.StringIO("")
handler_debug_stream = io.StringIO("")
runner = MeowRunner(
WatchdogMonitor(
TEST_MONITOR_BASE,
rules,
print=monitor_debug_stream,
logging=3,
settletime=1
),
PapermillHandler(
TEST_HANDLER_BASE,
TEST_JOB_OUTPUT,
print=handler_debug_stream,
logging=3
)
)
runner.start()
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
make_dir(start_dir)
self.assertTrue(start_dir)
with open(os.path.join(start_dir, "A.txt"), "w") as f:
f.write("Initial Data")
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
loops = 0
job_id = None
while loops < 15:
sleep(1)
handler_debug_stream.seek(0)
messages = handler_debug_stream.readlines()
for msg in messages:
self.assertNotIn("ERROR", msg)
if "INFO: Completed job " in msg:
job_id = msg.replace("INFO: Completed job ", "")
job_id = job_id[:job_id.index(" with output")]
loops = 15
loops += 1
self.assertIsNotNone(job_id)
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1)
self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT))
runner.stop()
job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
self.assertEqual(len(os.listdir(job_dir)), 5)
result = read_notebook(os.path.join(job_dir, RESULT_FILE))
self.assertIsNotNone(result)
output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
self.assertTrue(os.path.exists(output_path))
with open(output_path, "r") as f:
data = f.read()
self.assertEqual(data, "Initial Data\nA line from a test Pattern")
def testMeowRunnerLinkeExecution(self)->None:
pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile",
parameters={
"extra":"A line from Pattern 1",
"outfile":"{VGRID}/middle/{FILENAME}"
})
pattern_two = FileEventPattern(
"pattern_two", "middle/A.txt", "recipe_one", "infile",
parameters={
"extra":"A line from Pattern 2",
"outfile":"{VGRID}/output/{FILENAME}"
})
recipe = JupyterNotebookRecipe(
"recipe_one", APPENDING_NOTEBOOK)
patterns = {
pattern_one.name: pattern_one,
pattern_two.name: pattern_two,
}
recipes = {
recipe.name: recipe,
}
rules = create_rules(patterns, recipes)
monitor_debug_stream = io.StringIO("")
handler_debug_stream = io.StringIO("")
runner = MeowRunner(
WatchdogMonitor(
TEST_MONITOR_BASE,
rules,
print=monitor_debug_stream,
logging=3,
settletime=1
),
PapermillHandler(
TEST_HANDLER_BASE,
TEST_JOB_OUTPUT,
print=handler_debug_stream,
logging=3
)
)
runner.start()
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
make_dir(start_dir)
self.assertTrue(start_dir)
with open(os.path.join(start_dir, "A.txt"), "w") as f:
f.write("Initial Data")
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
loops = 0
job_ids = []
while len(job_ids) < 2 and loops < 15:
sleep(1)
handler_debug_stream.seek(0)
messages = handler_debug_stream.readlines()
for msg in messages:
self.assertNotIn("ERROR", msg)
if "INFO: Completed job " in msg:
job_id = msg.replace("INFO: Completed job ", "")
job_id = job_id[:job_id.index(" with output")]
if job_id not in job_ids:
job_ids.append(job_id)
loops += 1
print(job_ids)
self.assertEqual(len(job_ids), 2)
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 2)
self.assertIn(job_ids[0], os.listdir(TEST_JOB_OUTPUT))
self.assertIn(job_ids[1], os.listdir(TEST_JOB_OUTPUT))
runner.stop()
mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
self.assertEqual(len(os.listdir(mid_job_dir)), 5)
result = read_notebook(os.path.join(mid_job_dir, RESULT_FILE))
self.assertIsNotNone(result)
mid_output_path = os.path.join(TEST_MONITOR_BASE, "middle", "A.txt")
self.assertTrue(os.path.exists(mid_output_path))
with open(mid_output_path, "r") as f:
data = f.read()
self.assertEqual(data, "Initial Data\nA line from Pattern 1")
final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
self.assertEqual(len(os.listdir(final_job_dir)), 5)
result = read_notebook(os.path.join(final_job_dir, RESULT_FILE))
self.assertIsNotNone(result)
final_output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
self.assertTrue(os.path.exists(final_output_path))
with open(final_output_path, "r") as f:
data = f.read()
self.assertEqual(data,
"Initial Data\nA line from Pattern 1\nA line from Pattern 2")