From d9004394c13a72f74415b8a1d2d456bf688aec97 Mon Sep 17 00:00:00 2001 From: PatchOfScotland Date: Fri, 13 Jan 2023 18:04:50 +0100 Subject: [PATCH] refactored monitor handler interaction to better allow differing event types in same system --- core/correctness/validation.py | 7 +- core/correctness/vars.py | 9 +- core/functionality.py | 67 ++----------- core/meow.py | 152 +++++++++++++++++++++++------ patterns/file_event_pattern.py | 24 ++--- recipes/jupyter_notebook_recipe.py | 128 ++++++++++++------------ tests/test_functionality.py | 130 ++++-------------------- tests/test_meow.py | 84 ++++++++++++---- tests/test_patterns.py | 29 ++++-- tests/test_recipes.py | 64 +++--------- tests/test_validation.py | 18 +++- 11 files changed, 357 insertions(+), 355 deletions(-) diff --git a/core/correctness/validation.py b/core/correctness/validation.py index ab3ccc3..f542387 100644 --- a/core/correctness/validation.py +++ b/core/correctness/validation.py @@ -3,7 +3,7 @@ from inspect import signature from os.path import sep, exists, isfile, isdir, dirname from typing import Any, _SpecialForm, Union, Tuple, get_origin, get_args -from core.correctness.vars import VALID_PATH_CHARS, get_not_imp_msg +from core.correctness.vars import VALID_PATH_CHARS, get_not_imp_msg, EVENT_TYPE def check_type(variable:Any, expected_type:type, alt_types:list[type]=[], or_none:bool=False)->None: @@ -178,3 +178,8 @@ def setup_debugging(print:Any=None, logging:int=0)->Tuple[Any,int]: "'write' function") return print, logging + +def valid_event(event)->None: + check_type(event, dict) + if not EVENT_TYPE in event.keys(): + raise KeyError(f"Events require key '{EVENT_TYPE}'") diff --git a/core/correctness/vars.py b/core/correctness/vars.py index b344977..87ab2af 100644 --- a/core/correctness/vars.py +++ b/core/correctness/vars.py @@ -187,7 +187,14 @@ APPENDING_NOTEBOOK = { "nbformat_minor": 4 } -# events +# meow events +EVENT_TYPE = "meow_event_type" +WATCHDOG_TYPE = "watchdog" +WATCHDOG_SRC = "src_path" +WATCHDOG_BASE = "monitor_base" +WATCHDOG_RULE = "rule_name" + +# inotify events FILE_CREATE_EVENT = "file_created" FILE_MODIFY_EVENT = "file_modified" FILE_MOVED_EVENT = "file_moved" diff --git a/core/functionality.py b/core/functionality.py index de10686..0042281 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -14,28 +14,12 @@ from papermill.translators import papermill_translators from typing import Any, Union from random import SystemRandom -from core.meow import BasePattern, BaseRecipe, BaseRule +#from core.meow import BasePattern, BaseRecipe, BaseRule from core.correctness.validation import check_type, valid_dict, valid_list, \ valid_existing_file_path, valid_path from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ VALID_CHANNELS, HASH_BUFFER_SIZE, SHA256, DEBUG_WARNING, DEBUG_ERROR, \ - DEBUG_INFO - -def check_pattern_dict(patterns, min_length=1): - valid_dict(patterns, str, BasePattern, strict=False, min_length=min_length) - for k, v in patterns.items(): - if k != v.name: - raise KeyError(f"Key '{k}' indexes unexpected Pattern '{v.name}' " - "Pattern dictionaries must be keyed with the name of the " - "Pattern.") - -def check_recipe_dict(recipes, min_length=1): - valid_dict(recipes, str, BaseRecipe, strict=False, min_length=min_length) - for k, v in recipes.items(): - if k != v.name: - raise KeyError(f"Key '{k}' indexes unexpected Recipe '{v.name}' " - "Recipe dictionaries must be keyed with the name of the " - "Recipe.") + DEBUG_INFO, EVENT_TYPE def generate_id(prefix:str="", length:int=16, existing_ids:list[str]=[], charset:str=CHAR_UPPERCASE+CHAR_LOWERCASE, attempts:int=24): @@ -48,45 +32,6 @@ def generate_id(prefix:str="", length:int=16, existing_ids:list[str]=[], raise ValueError(f"Could not generate ID unique from '{existing_ids}' " f"using values '{charset}' and length of '{length}'.") -def create_rules(patterns:Union[dict[str,BasePattern],list[BasePattern]], - recipes:Union[dict[str,BaseRecipe],list[BaseRecipe]], - new_rules:list[BaseRule]=[])->dict[str,BaseRule]: - check_type(patterns, dict, alt_types=[list]) - check_type(recipes, dict, alt_types=[list]) - valid_list(new_rules, BaseRule, min_length=0) - - if isinstance(patterns, list): - valid_list(patterns, BasePattern, min_length=0) - patterns = {pattern.name:pattern for pattern in patterns} - else: - check_pattern_dict(patterns, min_length=0) - - if isinstance(recipes, list): - valid_list(recipes, BaseRecipe, min_length=0) - recipes = {recipe.name:recipe for recipe in recipes} - else: - check_recipe_dict(recipes, min_length=0) - - # Imported here to avoid circular imports at top of file - import rules - rules = {} - all_rules ={(r.pattern_type, r.recipe_type):r for r in [r[1] \ - for r in inspect.getmembers(sys.modules["rules"], inspect.isclass) \ - if (issubclass(r[1], BaseRule))]} - - for pattern in patterns.values(): - if pattern.recipe in recipes: - key = (type(pattern).__name__, - type(recipes[pattern.recipe]).__name__) - if (key) in all_rules: - rule = all_rules[key]( - generate_id(prefix="Rule_"), - pattern, - recipes[pattern.recipe] - ) - rules[rule.name] = rule - return rules - def wait(inputs:list[VALID_CHANNELS])->list[VALID_CHANNELS]: all_connections = [i for i in inputs if type(i) is Connection] \ + [i._reader for i in inputs if type(i) is Queue] @@ -133,6 +78,8 @@ def rmtree(directory:str): :return: No return """ + if not os.path.exists(directory): + return for root, dirs, files in os.walk(directory, topdown=False): for file in files: os.remove(os.path.join(root, file)) @@ -295,4 +242,8 @@ def print_debug(print_target, debug_level, msg, level)->None: status = "INFO" elif level == DEBUG_WARNING: status = "WARNING" - print(f"{status}: {msg}", file=print_target) \ No newline at end of file + print(f"{status}: {msg}", file=print_target) + +def create_event(event_type:str, source:dict[Any,Any]={})->dict[Any,Any]: + return {**source, EVENT_TYPE: event_type} + diff --git a/core/meow.py b/core/meow.py index 1224546..1fa3550 100644 --- a/core/meow.py +++ b/core/meow.py @@ -1,11 +1,17 @@ -from typing import Any +import inspect +import sys +import threading + +from multiprocessing import Pipe +from typing import Any, Union from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ VALID_PATTERN_NAME_CHARS, VALID_RULE_NAME_CHARS, VALID_CHANNELS, \ - get_drt_imp_msg + get_drt_imp_msg, DEBUG_WARNING, DEBUG_INFO, DEBUG_ERROR from core.correctness.validation import valid_string, check_type, \ - check_implementation + check_implementation, valid_list, valid_dict, setup_debugging +from core.functionality import print_debug, wait, generate_id class BaseRecipe: @@ -127,15 +133,11 @@ class BaseRule: class BaseMonitor: rules: dict[str, BaseRule] - report: VALID_CHANNELS - def __init__(self, rules:dict[str,BaseRule], - report:VALID_CHANNELS)->None: + to_runner: VALID_CHANNELS + def __init__(self, rules:dict[str,BaseRule])->None: check_implementation(type(self).start, BaseMonitor) check_implementation(type(self).stop, BaseMonitor) - check_implementation(type(self)._is_valid_report, BaseMonitor) check_implementation(type(self)._is_valid_rules, BaseMonitor) - self._is_valid_report(report) - self.report = report self._is_valid_rules(rules) self.rules = rules @@ -145,9 +147,6 @@ class BaseMonitor: raise TypeError(msg) return object.__new__(cls) - def _is_valid_report(self, report:VALID_CHANNELS)->None: - pass - def _is_valid_rules(self, rules:dict[str,BaseRule])->None: pass @@ -159,14 +158,9 @@ class BaseMonitor: class BaseHandler: - inputs:Any - def __init__(self, inputs:list[VALID_CHANNELS]) -> None: - check_implementation(type(self).start, BaseHandler) - check_implementation(type(self).stop, BaseHandler) + def __init__(self) -> None: check_implementation(type(self).handle, BaseHandler) - check_implementation(type(self)._is_valid_inputs, BaseHandler) - self._is_valid_inputs(inputs) - self.inputs = inputs + check_implementation(type(self).valid_event_types, BaseHandler) def __new__(cls, *args, **kwargs): if cls is BaseHandler: @@ -174,40 +168,134 @@ class BaseHandler: raise TypeError(msg) return object.__new__(cls) - def _is_valid_inputs(self, inputs:Any)->None: + def valid_event_types(self)->list[str]: pass - def handle(self, event:Any, rule:BaseRule)->None: - pass - - def start(self)->None: - pass - - def stop(self)->None: + def handle(self, event:Any)->None: pass +# TODO reformat to allow for updated monitor / handler interaction +# TODO expand this to allow for lists of monitors / handlers class MeowRunner: monitor:BaseMonitor handler:BaseHandler - def __init__(self, monitor:BaseMonitor, handler:BaseHandler) -> None: + from_monitor: VALID_CHANNELS + def __init__(self, monitor:BaseMonitor, handler:BaseHandler, + print:Any=sys.stdout, logging:int=0) -> None: self._is_valid_monitor(monitor) self.monitor = monitor + monitor_to_runner_reader, monitor_to_runner_writer = Pipe() + self.monitor.to_runner = monitor_to_runner_writer + self.from_monitor = monitor_to_runner_reader self._is_valid_handler(handler) self.handler = handler + self._stop_pipe = Pipe() + self._worker = None + self._print_target, self.debug_level = setup_debugging(print, logging) + + def run(self)->None: + all_inputs = [self.from_monitor, self._stop_pipe[0]] + while True: + ready = wait(all_inputs) + + if self._stop_pipe[0] in ready: + return + else: + message = self.from_monitor.recv() + event = message + self.handler.handle(event) def start(self)->None: self.monitor.start() - if hasattr(self.handler, "start"): - self.handler.start() + #if hasattr(self.handler, "start"): + # self.handler.start() + + if self._worker is None: + self._worker = threading.Thread( + target=self.run, + args=[]) + self._worker.daemon = True + self._worker.start() + print_debug(self._print_target, self.debug_level, + "Starting MeowRunner run...", DEBUG_INFO) + else: + msg = "Repeated calls to start have no effect." + print_debug(self._print_target, self.debug_level, + msg, DEBUG_WARNING) + raise RuntimeWarning(msg) + def stop(self)->None: self.monitor.stop() - if hasattr(self.handler, "stop"): - self.handler.stop() + #if hasattr(self.handler, "stop"): + # self.handler.stop() + + if self._worker is None: + msg = "Cannot stop thread that is not started." + print_debug(self._print_target, self.debug_level, + msg, DEBUG_WARNING) + raise RuntimeWarning(msg) + else: + self._stop_pipe[1].send(1) + self._worker.join() + print_debug(self._print_target, self.debug_level, + "Worker thread stopped", DEBUG_INFO) + def _is_valid_monitor(self, monitor:BaseMonitor)->None: check_type(monitor, BaseMonitor) def _is_valid_handler(self, handler:BaseHandler)->None: check_type(handler, BaseHandler) + +def create_rules(patterns:Union[dict[str,BasePattern],list[BasePattern]], + recipes:Union[dict[str,BaseRecipe],list[BaseRecipe]], + new_rules:list[BaseRule]=[])->dict[str,BaseRule]: + check_type(patterns, dict, alt_types=[list]) + check_type(recipes, dict, alt_types=[list]) + valid_list(new_rules, BaseRule, min_length=0) + + if isinstance(patterns, list): + valid_list(patterns, BasePattern, min_length=0) + patterns = {pattern.name:pattern for pattern in patterns} + else: + valid_dict(patterns, str, BasePattern, strict=False, min_length=0) + for k, v in patterns.items(): + if k != v.name: + raise KeyError( + f"Key '{k}' indexes unexpected Pattern '{v.name}' " + "Pattern dictionaries must be keyed with the name of the " + "Pattern.") + + if isinstance(recipes, list): + valid_list(recipes, BaseRecipe, min_length=0) + recipes = {recipe.name:recipe for recipe in recipes} + else: + valid_dict(recipes, str, BaseRecipe, strict=False, min_length=0) + for k, v in recipes.items(): + if k != v.name: + raise KeyError( + f"Key '{k}' indexes unexpected Recipe '{v.name}' " + "Recipe dictionaries must be keyed with the name of the " + "Recipe.") + + # Imported here to avoid circular imports at top of file + import rules + rules = {} + all_rules ={(r.pattern_type, r.recipe_type):r for r in [r[1] \ + for r in inspect.getmembers(sys.modules["rules"], inspect.isclass) \ + if (issubclass(r[1], BaseRule))]} + + for pattern in patterns.values(): + if pattern.recipe in recipes: + key = (type(pattern).__name__, + type(recipes[pattern.recipe]).__name__) + if (key) in all_rules: + rule = all_rules[key]( + generate_id(prefix="Rule_"), + pattern, + recipes[pattern.recipe] + ) + rules[rule.name] = rule + return rules diff --git a/patterns/file_event_pattern.py b/patterns/file_event_pattern.py index 1520223..049a9e9 100644 --- a/patterns/file_event_pattern.py +++ b/patterns/file_event_pattern.py @@ -16,8 +16,9 @@ from core.correctness.validation import check_type, valid_string, \ from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ VALID_VARIABLE_NAME_CHARS, FILE_EVENTS, FILE_CREATE_EVENT, \ FILE_MODIFY_EVENT, FILE_MOVED_EVENT, VALID_CHANNELS, DEBUG_INFO, \ - DEBUG_ERROR, DEBUG_WARNING -from core.functionality import print_debug + DEBUG_ERROR, DEBUG_WARNING, WATCHDOG_TYPE, WATCHDOG_SRC, WATCHDOG_RULE, \ + WATCHDOG_BASE +from core.functionality import print_debug, create_event from core.meow import BasePattern, BaseMonitor, BaseRule _DEFAULT_MASK = [ @@ -125,9 +126,9 @@ class WatchdogMonitor(BaseMonitor): _rules_lock:threading.Lock def __init__(self, base_dir:str, rules:dict[str, BaseRule], - report:VALID_CHANNELS, autostart=False, - settletime:int=1, print:Any=sys.stdout, logging:int=0)->None: - super().__init__(rules, report) + autostart=False, settletime:int=1, print:Any=sys.stdout, + logging:int=0)->None: + super().__init__(rules) self._is_valid_base_dir(base_dir) self.base_dir = base_dir check_type(settletime, int) @@ -179,11 +180,16 @@ class WatchdogMonitor(BaseMonitor): direct_hit = match(direct_regexp, handle_path) if direct_hit or recursive_hit: + meow_event = create_event( + WATCHDOG_TYPE, { + WATCHDOG_SRC: event.src_path, + WATCHDOG_BASE: self.base_dir, + WATCHDOG_RULE: rule + }) print_debug(self._print_target, self.debug_level, f"Event at {src_path} of type {event_type} hit rule " f"{rule.name}", DEBUG_INFO) - event.monitor_base = self.base_dir - self.report.send((event, rule)) + self.to_runner.send(meow_event) except Exception as e: self._rules_lock.release() @@ -191,13 +197,9 @@ class WatchdogMonitor(BaseMonitor): self._rules_lock.release() - def _is_valid_base_dir(self, base_dir:str)->None: valid_existing_dir_path(base_dir) - def _is_valid_report(self, report:VALID_CHANNELS)->None: - check_type(report, VALID_CHANNELS) - def _is_valid_rules(self, rules:dict[str, BaseRule])->None: valid_dict(rules, str, BaseRule, min_length=0, strict=False) diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index 54c21e4..ebc6f40 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -17,7 +17,8 @@ from core.correctness.validation import check_type, valid_string, \ valid_dict, valid_path, valid_list, valid_existing_dir_path, \ setup_debugging from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, VALID_CHANNELS, \ - SHA256, DEBUG_ERROR, DEBUG_WARNING, DEBUG_INFO + SHA256, DEBUG_ERROR, DEBUG_WARNING, DEBUG_INFO, WATCHDOG_TYPE, \ + WATCHDOG_SRC, WATCHDOG_BASE, WATCHDOG_RULE from core.functionality import wait, get_file_hash, generate_id, make_dir, \ write_yaml, write_notebook, get_file_hash, parameterize_jupyter_notebook, \ print_debug @@ -98,9 +99,9 @@ class PapermillHandler(BaseHandler): _jobs:list[str] _jobs_lock:threading.Lock _print_target:Any - def __init__(self, inputs:list[VALID_CHANNELS], handler_base:str, - output_dir:str, print:Any=sys.stdout, logging:int=0)->None: - super().__init__(inputs) + def __init__(self, handler_base:str, output_dir:str, print:Any=sys.stdout, + logging:int=0)->None: + super().__init__() self._is_valid_handler_base(handler_base) self.handler_base = handler_base self._is_valid_output_dir(output_dir) @@ -113,61 +114,62 @@ class PapermillHandler(BaseHandler): print_debug(self._print_target, self.debug_level, "Created new PapermillHandler instance", DEBUG_INFO) - def run(self)->None: - all_inputs = self.inputs + [self._stop_pipe[0]] - while True: - ready = wait(all_inputs) +# def run(self)->None: +# all_inputs = self.inputs + [self._stop_pipe[0]] +# while True: +# ready = wait(all_inputs) +# +# if self._stop_pipe[0] in ready: +# return +# else: +# for input in self.inputs: +# if input in ready: +# message = input.recv() +# event = message +# self.handle(event) +# +# def start(self)->None: +# if self._worker is None: +# self._worker = threading.Thread( +# target=self.run, +# args=[]) +# self._worker.daemon = True +# self._worker.start() +# print_debug(self._print_target, self.debug_level, +# "Starting PapermillHandler run...", DEBUG_INFO) +# else: +# msg = "Repeated calls to start have no effect." +# print_debug(self._print_target, self.debug_level, +# msg, DEBUG_WARNING) +# raise RuntimeWarning(msg) +# +# def stop(self)->None: +# if self._worker is None: +# msg = "Cannot stop thread that is not started." +# print_debug(self._print_target, self.debug_level, +# msg, DEBUG_WARNING) +# raise RuntimeWarning(msg) +# else: +# self._stop_pipe[1].send(1) +# self._worker.join() +# print_debug(self._print_target, self.debug_level, +# "Worker thread stopped", DEBUG_INFO) - if self._stop_pipe[0] in ready: - return - else: - for input in self.inputs: - if input in ready: - message = input.recv() - event, rule = message - self.handle(event, rule) - - def start(self)->None: - if self._worker is None: - self._worker = threading.Thread( - target=self.run, - args=[]) - self._worker.daemon = True - self._worker.start() - print_debug(self._print_target, self.debug_level, - "Starting PapermillHandler run...", DEBUG_INFO) - else: - msg = "Repeated calls to start have no effect." - print_debug(self._print_target, self.debug_level, - msg, DEBUG_WARNING) - raise RuntimeWarning(msg) - - def stop(self)->None: - if self._worker is None: - msg = "Cannot stop thread that is not started." - print_debug(self._print_target, self.debug_level, - msg, DEBUG_WARNING) - raise RuntimeWarning(msg) - else: - self._stop_pipe[1].send(1) - self._worker.join() - print_debug(self._print_target, self.debug_level, - "Worker thread stopped", DEBUG_INFO) - - def handle(self, event:FileSystemEvent, rule:BaseRule)->None: + def handle(self, event:dict[Any,Any])->None: # TODO finish implementation and test print_debug(self._print_target, self.debug_level, - f"Handling event {event.src_path}", DEBUG_INFO) + f"Handling event {event[WATCHDOG_SRC]}", DEBUG_INFO) - file_hash = get_file_hash(event.src_path, SHA256) + file_hash = get_file_hash(event[WATCHDOG_SRC], SHA256) + rule = event[WATCHDOG_RULE] yaml_dict = {} for var, val in rule.pattern.parameters.items(): yaml_dict[var] = val for var, val in rule.pattern.outputs.items(): yaml_dict[var] = val - yaml_dict[rule.pattern.triggering_file] = event.src_path + yaml_dict[rule.pattern.triggering_file] = event[WATCHDOG_SRC] if not rule.pattern.sweep: waiting_for_threaded_resources = True @@ -175,7 +177,7 @@ class PapermillHandler(BaseHandler): try: worker = threading.Thread( target=self.execute_job, - args=[event, rule, yaml_dict, file_hash]) + args=[event, yaml_dict, file_hash]) worker.daemon = True worker.start() waiting_for_threaded_resources = False @@ -184,7 +186,6 @@ class PapermillHandler(BaseHandler): else: for var, val in rule.pattern.sweep.items(): values = [] - par_val = rule.pattern.sweep[SWEEP_START] while par_val <= rule.pattern.sweep[SWEEP_STOP]: values.append(par_val) @@ -197,7 +198,7 @@ class PapermillHandler(BaseHandler): try: worker = threading.Thread( target=self.execute_job, - args=[event, rule, yaml_dict, file_hash]) + args=[event, yaml_dict, file_hash]) worker.daemon = True worker.start() waiting_for_threaded_resources = False @@ -223,6 +224,9 @@ class PapermillHandler(BaseHandler): self._jobs_lock.release() return jobs_deepcopy + def valid_event_types(self)->list[str]: + return [WATCHDOG_TYPE] + def _is_valid_inputs(self, inputs:list[VALID_CHANNELS])->None: valid_list(inputs, VALID_CHANNELS) @@ -232,23 +236,23 @@ class PapermillHandler(BaseHandler): def _is_valid_output_dir(self, output_dir)->None: valid_existing_dir_path(output_dir, allow_base=True) - def execute_job(self, event:FileSystemEvent, rule:BaseRule, + def execute_job(self, event:FileSystemEvent, yaml_dict:dict[str,Any], triggerfile_hash:str)->None: job_dict = { JOB_ID: generate_id(prefix="job_", existing_ids=self.get_jobs()), - JOB_PATTERN: rule.pattern, - JOB_RECIPE: rule.recipe, - JOB_RULE: rule.name, - JOB_PATH: event.src_path, + JOB_PATTERN: event[WATCHDOG_RULE].pattern, + JOB_RECIPE: event[WATCHDOG_RULE].recipe, + JOB_RULE: event[WATCHDOG_RULE].name, + JOB_PATH: event[WATCHDOG_SRC], JOB_HASH: triggerfile_hash, JOB_STATUS: STATUS_QUEUED, JOB_CREATE_TIME: datetime.now(), - JOB_REQUIREMENTS: rule.recipe.requirements + JOB_REQUIREMENTS: event[WATCHDOG_RULE].recipe.requirements } print_debug(self._print_target, self.debug_level, - f"Creating job for event at {event.src_path} with ID " + f"Creating job for event at {event[WATCHDOG_SRC]} with ID " f"{job_dict[JOB_ID]}", DEBUG_INFO) self.add_job(job_dict[JOB_ID]) @@ -256,8 +260,8 @@ class PapermillHandler(BaseHandler): yaml_dict = self.replace_keywords( yaml_dict, job_dict[JOB_ID], - event.src_path, - event.monitor_base + event[WATCHDOG_SRC], + event[WATCHDOG_BASE] ) job_dir = os.path.join(self.handler_base, job_dict[JOB_ID]) @@ -267,7 +271,7 @@ class PapermillHandler(BaseHandler): write_yaml(job_dict, meta_file) base_file = os.path.join(job_dir, BASE_FILE) - write_notebook(rule.recipe.recipe, base_file) + write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file) param_file = os.path.join(job_dir, PARAMS_FILE) write_yaml(yaml_dict, param_file) @@ -298,7 +302,7 @@ class PapermillHandler(BaseHandler): try: job_notebook = parameterize_jupyter_notebook( - rule.recipe.recipe, yaml_dict + event[WATCHDOG_RULE].recipe.recipe, yaml_dict ) write_notebook(job_notebook, job_file) except Exception: diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 19b3b5d..6b4756f 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -6,25 +6,14 @@ from multiprocessing import Pipe, Queue from time import sleep from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ - BAREBONES_NOTEBOOK, SHA256, TEST_MONITOR_BASE, COMPLETE_NOTEBOOK -from core.functionality import create_rules, generate_id, wait, \ - check_pattern_dict, check_recipe_dict, get_file_hash, rmtree, make_dir, \ - parameterize_jupyter_notebook + BAREBONES_NOTEBOOK, SHA256, TEST_MONITOR_BASE, COMPLETE_NOTEBOOK, \ + EVENT_TYPE +from core.functionality import generate_id, wait, get_file_hash, rmtree, \ + make_dir, parameterize_jupyter_notebook, create_event from core.meow import BaseRule from patterns.file_event_pattern import FileEventPattern from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe -valid_pattern_one = FileEventPattern( - "pattern_one", "path_one", "recipe_one", "file_one") -valid_pattern_two = FileEventPattern( - "pattern_two", "path_two", "recipe_two", "file_two") - -valid_recipe_one = JupyterNotebookRecipe( - "recipe_one", BAREBONES_NOTEBOOK) -valid_recipe_two = JupyterNotebookRecipe( - "recipe_two", BAREBONES_NOTEBOOK) - - class CorrectnessTests(unittest.TestCase): def setUp(self) -> None: @@ -35,9 +24,6 @@ class CorrectnessTests(unittest.TestCase): super().tearDown() rmtree(TEST_MONITOR_BASE) - def testCreateRulesMinimum(self)->None: - create_rules({}, {}) - def testGenerateIDWorking(self)->None: id = generate_id() self.assertEqual(len(id), 16) @@ -64,97 +50,6 @@ class CorrectnessTests(unittest.TestCase): prefix_id = generate_id(prefix="Test") self.assertEqual(len(prefix_id), 16) self.assertTrue(prefix_id.startswith("Test")) - - def testCreateRulesPatternsAndRecipesDicts(self)->None: - patterns = { - valid_pattern_one.name: valid_pattern_one, - valid_pattern_two.name: valid_pattern_two - } - recipes = { - valid_recipe_one.name: valid_recipe_one, - valid_recipe_two.name: valid_recipe_two - } - rules = create_rules(patterns, recipes) - self.assertIsInstance(rules, dict) - self.assertEqual(len(rules), 2) - for k, rule in rules.items(): - self.assertIsInstance(k, str) - self.assertIsInstance(rule, BaseRule) - self.assertEqual(k, rule.name) - - def testCreateRulesMisindexedPatterns(self)->None: - patterns = { - valid_pattern_two.name: valid_pattern_one, - valid_pattern_one.name: valid_pattern_two - } - with self.assertRaises(KeyError): - create_rules(patterns, {}) - - def testCreateRulesMisindexedRecipes(self)->None: - recipes = { - valid_recipe_two.name: valid_recipe_one, - valid_recipe_one.name: valid_recipe_two - } - with self.assertRaises(KeyError): - create_rules({}, recipes) - - def testCheckPatternDictValid(self)->None: - fep1 = FileEventPattern("name_one", "path", "recipe", "file") - fep2 = FileEventPattern("name_two", "path", "recipe", "file") - - patterns = { - fep1.name: fep1, - fep2.name: fep2 - } - - check_pattern_dict(patterns=patterns) - - def testCheckPatternDictNoEntries(self)->None: - with self.assertRaises(ValueError): - check_pattern_dict(patterns={}) - - check_pattern_dict(patterns={}, min_length=0) - - def testCheckPatternDictMissmatchedName(self)->None: - fep1 = FileEventPattern("name_one", "path", "recipe", "file") - fep2 = FileEventPattern("name_two", "path", "recipe", "file") - - patterns = { - fep2.name: fep1, - fep1.name: fep2 - } - - with self.assertRaises(KeyError): - check_pattern_dict(patterns=patterns) - - def testCheckRecipeDictValid(self)->None: - jnr1 = JupyterNotebookRecipe("recipe_one", BAREBONES_NOTEBOOK) - jnr2 = JupyterNotebookRecipe("recipe_two", BAREBONES_NOTEBOOK) - - recipes = { - jnr1.name: jnr1, - jnr2.name: jnr2 - } - - check_recipe_dict(recipes=recipes) - - def testCheckRecipeDictNoEntires(self)->None: - with self.assertRaises(ValueError): - check_recipe_dict(recipes={}) - - check_recipe_dict(recipes={}, min_length=0) - - def testCheckRecipeDictMismatchedName(self)->None: - jnr1 = JupyterNotebookRecipe("recipe_one", BAREBONES_NOTEBOOK) - jnr2 = JupyterNotebookRecipe("recipe_two", BAREBONES_NOTEBOOK) - - recipes = { - jnr2.name: jnr1, - jnr1.name: jnr2 - } - - with self.assertRaises(KeyError): - check_recipe_dict(recipes=recipes) def testWaitPipes(self)->None: pipe_one_reader, pipe_one_writer = Pipe() @@ -326,3 +221,20 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual( pn["cells"][0]["source"], "# The first cell\n\ns = 4\nnum = 1000") + + def testCreateEvent(self)->None: + event = create_event("test") + + self.assertEqual(type(event), dict) + self.assertTrue(EVENT_TYPE in event.keys()) + self.assertEqual(len(event.keys()), 1) + self.assertEqual(event[EVENT_TYPE], "test") + + event2 = create_event("test2", {"a":1}) + + self.assertEqual(type(event2), dict) + self.assertTrue(EVENT_TYPE in event2.keys()) + self.assertEqual(len(event2.keys()), 2) + self.assertEqual(event2[EVENT_TYPE], "test2") + self.assertEqual(event2["a"], 1) + diff --git a/tests/test_meow.py b/tests/test_meow.py index 60cbf70..ea9dc60 100644 --- a/tests/test_meow.py +++ b/tests/test_meow.py @@ -8,14 +8,24 @@ from time import sleep from typing import Any from core.correctness.vars import TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \ - TEST_MONITOR_BASE, APPENDING_NOTEBOOK -from core.functionality import make_dir, rmtree, create_rules, read_notebook + TEST_MONITOR_BASE, APPENDING_NOTEBOOK, BAREBONES_NOTEBOOK +from core.functionality import make_dir, rmtree, read_notebook from core.meow import BasePattern, BaseRecipe, BaseRule, BaseMonitor, \ - BaseHandler, MeowRunner + BaseHandler, MeowRunner, create_rules from patterns import WatchdogMonitor, FileEventPattern from recipes.jupyter_notebook_recipe import PapermillHandler, \ JupyterNotebookRecipe, RESULT_FILE +valid_pattern_one = FileEventPattern( + "pattern_one", "path_one", "recipe_one", "file_one") +valid_pattern_two = FileEventPattern( + "pattern_two", "path_two", "recipe_two", "file_two") + +valid_recipe_one = JupyterNotebookRecipe( + "recipe_one", BAREBONES_NOTEBOOK) +valid_recipe_two = JupyterNotebookRecipe( + "recipe_two", BAREBONES_NOTEBOOK) + class MeowTests(unittest.TestCase): def setUp(self) -> None: @@ -84,39 +94,75 @@ class MeowTests(unittest.TestCase): pass FullRule("name", "", "") + def testCreateRulesMinimum(self)->None: + create_rules({}, {}) + + def testCreateRulesPatternsAndRecipesDicts(self)->None: + patterns = { + valid_pattern_one.name: valid_pattern_one, + valid_pattern_two.name: valid_pattern_two + } + recipes = { + valid_recipe_one.name: valid_recipe_one, + valid_recipe_two.name: valid_recipe_two + } + rules = create_rules(patterns, recipes) + self.assertIsInstance(rules, dict) + self.assertEqual(len(rules), 2) + for k, rule in rules.items(): + self.assertIsInstance(k, str) + self.assertIsInstance(rule, BaseRule) + self.assertEqual(k, rule.name) + + def testCreateRulesMisindexedPatterns(self)->None: + patterns = { + valid_pattern_two.name: valid_pattern_one, + valid_pattern_one.name: valid_pattern_two + } + with self.assertRaises(KeyError): + create_rules(patterns, {}) + + def testCreateRulesMisindexedRecipes(self)->None: + recipes = { + valid_recipe_two.name: valid_recipe_one, + valid_recipe_one.name: valid_recipe_two + } + with self.assertRaises(KeyError): + create_rules({}, recipes) + def testBaseMonitor(self)->None: with self.assertRaises(TypeError): - BaseMonitor("", "") + BaseMonitor("") class TestMonitor(BaseMonitor): pass with self.assertRaises(NotImplementedError): - TestMonitor("", "") + TestMonitor("") class FullTestMonitor(BaseMonitor): def start(self): pass def stop(self): pass - def _is_valid_report(self, report:Any)->None: + def _is_valid_to_runner(self, to_runner:Any)->None: pass def _is_valid_rules(self, rules:Any)->None: pass - FullTestMonitor("", "") + FullTestMonitor("") def testBaseHandler(self)->None: with self.assertRaises(TypeError): - BaseHandler("") + BaseHandler() class TestHandler(BaseHandler): pass with self.assertRaises(NotImplementedError): - TestHandler("") + TestHandler() class FullTestHandler(BaseHandler): - def handle(self, event, rule): + def handle(self, event): pass def start(self): pass @@ -124,10 +170,12 @@ class MeowTests(unittest.TestCase): pass def _is_valid_inputs(self, inputs:Any)->None: pass - FullTestHandler("") + def valid_event_types(self)->list[str]: + pass + FullTestHandler() def testMeowRunner(self)->None: - monitor_to_handler_reader, monitor_to_handler_writer = Pipe() + #monitor_to_handler_reader, monitor_to_handler_writer = Pipe() pattern_one = FileEventPattern( "pattern_one", "start/A.txt", "recipe_one", "infile", @@ -153,12 +201,11 @@ class MeowTests(unittest.TestCase): WatchdogMonitor( TEST_MONITOR_BASE, rules, - monitor_to_handler_writer, print=monitor_debug_stream, - logging=3, settletime=1 + logging=3, + settletime=1 ), PapermillHandler( - [monitor_to_handler_reader], TEST_HANDLER_BASE, TEST_JOB_OUTPUT, print=handler_debug_stream, @@ -213,7 +260,7 @@ class MeowTests(unittest.TestCase): self.assertEqual(data, "Initial Data\nA line from a test Pattern") def testMeowRunnerLinkeExecution(self)->None: - monitor_to_handler_reader, monitor_to_handler_writer = Pipe() + #monitor_to_handler_reader, monitor_to_handler_writer = Pipe() pattern_one = FileEventPattern( "pattern_one", "start/A.txt", "recipe_one", "infile", @@ -246,12 +293,11 @@ class MeowTests(unittest.TestCase): WatchdogMonitor( TEST_MONITOR_BASE, rules, - monitor_to_handler_writer, print=monitor_debug_stream, - logging=3, settletime=1 + logging=3, + settletime=1 ), PapermillHandler( - [monitor_to_handler_reader], TEST_HANDLER_BASE, TEST_JOB_OUTPUT, print=handler_debug_stream, diff --git a/tests/test_patterns.py b/tests/test_patterns.py index 850605f..ed30a11 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -5,8 +5,10 @@ import unittest from multiprocessing import Pipe from core.correctness.vars import FILE_EVENTS, FILE_CREATE_EVENT, \ - BAREBONES_NOTEBOOK, TEST_MONITOR_BASE -from core.functionality import create_rules, rmtree, make_dir + BAREBONES_NOTEBOOK, TEST_MONITOR_BASE, EVENT_TYPE, WATCHDOG_RULE, \ + WATCHDOG_BASE, WATCHDOG_SRC, WATCHDOG_TYPE +from core.functionality import rmtree, make_dir +from core.meow import create_rules from patterns.file_event_pattern import FileEventPattern, WatchdogMonitor, \ _DEFAULT_MASK, SWEEP_START, SWEEP_STOP, SWEEP_JUMP from recipes import JupyterNotebookRecipe @@ -143,7 +145,6 @@ class CorrectnessTests(unittest.TestCase): fep = FileEventPattern("name", "path", "recipe", "file", sweep=bad_sweep) - def testWatchdogMonitorMinimum(self)->None: from_monitor = Pipe() WatchdogMonitor(TEST_MONITOR_BASE, {}, from_monitor[1]) @@ -164,8 +165,13 @@ class CorrectnessTests(unittest.TestCase): } rules = create_rules(patterns, recipes) - wm = WatchdogMonitor(TEST_MONITOR_BASE, rules, from_monitor_writer) - + wm = WatchdogMonitor(TEST_MONITOR_BASE, rules) + wm.to_runner = from_monitor_writer + + self.assertEqual(len(rules), 1) + rule = rules[list(rules.keys())[0]] + + # TODO fix this test wm.start() open(os.path.join(TEST_MONITOR_BASE, "A"), "w") @@ -173,10 +179,17 @@ class CorrectnessTests(unittest.TestCase): message = from_monitor_reader.recv() self.assertIsNotNone(message) - event, rule = message + event = message self.assertIsNotNone(event) - self.assertIsNotNone(rule) - self.assertEqual(event.src_path, os.path.join(TEST_MONITOR_BASE, "A")) + self.assertEqual(type(event), dict) + self.assertTrue(EVENT_TYPE in event.keys()) + self.assertTrue(WATCHDOG_SRC in event.keys()) + self.assertTrue(WATCHDOG_BASE in event.keys()) + self.assertTrue(WATCHDOG_RULE in event.keys()) + self.assertEqual(event[EVENT_TYPE], WATCHDOG_TYPE) + self.assertEqual(event[WATCHDOG_SRC], os.path.join(TEST_MONITOR_BASE, "A")) + self.assertEqual(event[WATCHDOG_BASE], TEST_MONITOR_BASE) + self.assertEqual(event[WATCHDOG_RULE].name, rule.name) open(os.path.join(TEST_MONITOR_BASE, "B"), "w") if from_monitor_reader.poll(3): diff --git a/tests/test_recipes.py b/tests/test_recipes.py index 469a79e..2ddf9d1 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -13,8 +13,10 @@ from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ PapermillHandler, BASE_FILE, META_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE from rules.file_event_jupyter_notebook_rule import FileEventJupyterNotebookRule from core.correctness.vars import BAREBONES_NOTEBOOK, TEST_HANDLER_BASE, \ - TEST_JOB_OUTPUT, TEST_MONITOR_BASE, COMPLETE_NOTEBOOK -from core.functionality import rmtree, make_dir, create_rules, read_notebook + TEST_JOB_OUTPUT, TEST_MONITOR_BASE, COMPLETE_NOTEBOOK, EVENT_TYPE, \ + WATCHDOG_BASE, WATCHDOG_RULE, WATCHDOG_SRC, WATCHDOG_TYPE +from core.functionality import rmtree, make_dir, read_notebook +from core.meow import create_rules class CorrectnessTests(unittest.TestCase): def setUp(self) -> None: @@ -92,59 +94,15 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual(jnr.source, source) def testPapermillHanderMinimum(self)->None: - monitor_to_handler_reader, _ = Pipe() - PapermillHandler( - [monitor_to_handler_reader], TEST_HANDLER_BASE, TEST_JOB_OUTPUT ) - def testPapermillHanderStartStop(self)->None: - monitor_to_handler_reader, _ = Pipe() - - ph = PapermillHandler( - [monitor_to_handler_reader], - TEST_HANDLER_BASE, - TEST_JOB_OUTPUT - ) - - ph.start() - ph.stop() - - def testPapermillHanderRepeatedStarts(self)->None: - monitor_to_handler_reader, _ = Pipe() - - ph = PapermillHandler( - [monitor_to_handler_reader], - TEST_HANDLER_BASE, - TEST_JOB_OUTPUT - ) - - ph.start() - with self.assertRaises(RuntimeWarning): - ph.start() - ph.stop() - - def testPapermillHanderStopBeforeStart(self)->None: - monitor_to_handler_reader, _ = Pipe() - - ph = PapermillHandler( - [monitor_to_handler_reader], - TEST_HANDLER_BASE, - TEST_JOB_OUTPUT - ) - - with self.assertRaises(RuntimeWarning): - ph.stop() - def testPapermillHandlerHandling(self)->None: - monitor_to_handler_reader, to_handler = Pipe() - debug_stream = io.StringIO("") ph = PapermillHandler( - [monitor_to_handler_reader], TEST_HANDLER_BASE, TEST_JOB_OUTPUT, print=debug_stream, @@ -153,8 +111,6 @@ class CorrectnessTests(unittest.TestCase): with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") - event = FileCreatedEvent(os.path.join(TEST_MONITOR_BASE, "A")) - event.monitor_base = TEST_MONITOR_BASE pattern_one = FileEventPattern( "pattern_one", "A", "recipe_one", "file_one") @@ -175,8 +131,14 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) - ph.start() - to_handler.send((event, rule)) + event = { + EVENT_TYPE: WATCHDOG_TYPE, + WATCHDOG_SRC: os.path.join(TEST_MONITOR_BASE, "A"), + WATCHDOG_BASE: TEST_MONITOR_BASE, + WATCHDOG_RULE: rule + } + + ph.handle(event) loops = 0 job_id = None @@ -211,5 +173,3 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual("124875.0\n", result["cells"][4]["outputs"][0]["text"][0]) - - ph.stop() diff --git a/tests/test_validation.py b/tests/test_validation.py index b41e168..093fa20 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -6,8 +6,9 @@ from typing import Any, Union from core.correctness.validation import check_type, check_implementation, \ valid_string, valid_dict, valid_list, valid_existing_file_path, \ - valid_existing_dir_path, valid_non_existing_path -from core.correctness.vars import VALID_NAME_CHARS, TEST_MONITOR_BASE, SHA256 + valid_existing_dir_path, valid_non_existing_path, valid_event +from core.correctness.vars import VALID_NAME_CHARS, TEST_MONITOR_BASE, \ + SHA256, EVENT_TYPE from core.functionality import rmtree, make_dir class CorrectnessTests(unittest.TestCase): @@ -18,6 +19,7 @@ class CorrectnessTests(unittest.TestCase): def tearDown(self) -> None: super().tearDown() rmtree(TEST_MONITOR_BASE) + rmtree("first") def testCheckTypeValid(self)->None: check_type(1, int) @@ -204,3 +206,15 @@ class CorrectnessTests(unittest.TestCase): make_dir("first/second") with self.assertRaises(ValueError): valid_non_existing_path("first/second") + + def testEventValidation(self)->None: + valid_event({EVENT_TYPE: "test"}) + valid_event({EVENT_TYPE: "another"}) + valid_event({EVENT_TYPE: "anything", "a": 1}) + valid_event({EVENT_TYPE: "something", 1: 1}) + + with self.assertRaises(KeyError): + valid_event({"EVENT_TYPE": "test"}) + + with self.assertRaises(KeyError): + valid_event({})