From 31d06af5bf6d5d99252b48e4a35ecf00b94ce2bc Mon Sep 17 00:00:00 2001 From: PatchOfScotland Date: Thu, 26 Jan 2023 13:47:17 +0100 Subject: [PATCH] added rudimentary conductor for job execution --- conductors/__init__.py | 2 + conductors/local_python_conductor.py | 25 ++ core/correctness/validation.py | 44 +- core/correctness/vars.py | 39 +- core/functionality.py | 93 +++- core/meow.py | 23 +- core/runner.py | 173 ++++++-- patterns/file_event_pattern.py | 85 ++-- recipes/jupyter_notebook_recipe.py | 321 ++++---------- rules/file_event_jupyter_notebook_rule.py | 4 +- tests/test_conductors.py | 244 ++++++++++ tests/test_functionality.py | 300 ++++++++++++- tests/test_meow.py | 173 ++------ tests/test_patterns.py | 518 +++++++++++++++++++++- tests/test_recipes.py | 138 ++++-- tests/test_rules.py | 4 +- tests/test_runner.py | 207 +++++++-- tests/test_validation.py | 47 +- 18 files changed, 1895 insertions(+), 545 deletions(-) create mode 100644 conductors/__init__.py create mode 100644 conductors/local_python_conductor.py create mode 100644 tests/test_conductors.py diff --git a/conductors/__init__.py b/conductors/__init__.py new file mode 100644 index 0000000..13c5708 --- /dev/null +++ b/conductors/__init__.py @@ -0,0 +1,2 @@ + +from conductors.local_python_conductor import LocalPythonConductor diff --git a/conductors/local_python_conductor.py b/conductors/local_python_conductor.py new file mode 100644 index 0000000..9d48b0d --- /dev/null +++ b/conductors/local_python_conductor.py @@ -0,0 +1,25 @@ + +from typing import Any + +from core.correctness.vars import PYTHON_TYPE, PYTHON_FUNC +from core.correctness.validation import valid_job +from core.meow import BaseConductor + + +class LocalPythonConductor(BaseConductor): + def __init__(self)->None: + super().__init__() + + def valid_job_types(self)->list[str]: + return [PYTHON_TYPE] + + # TODO expand with more feedback + def execute(self, job:dict[str,Any])->None: + valid_job(job) + + job_function = job[PYTHON_FUNC] + job_arguments = job + + job_function(job_arguments) + + return diff --git a/core/correctness/validation.py b/core/correctness/validation.py index 5576084..0df0ed2 100644 --- a/core/correctness/validation.py +++ b/core/correctness/validation.py @@ -1,10 +1,28 @@ +from datetime import datetime from inspect import signature from os.path import sep, exists, isfile, isdir, dirname from typing import Any, _SpecialForm, Union, Tuple, get_origin, get_args from core.correctness.vars import VALID_PATH_CHARS, get_not_imp_msg, \ - EVENT_TYPE, EVENT_PATH + EVENT_TYPE, EVENT_PATH, JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, \ + JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME + +EVENT_KEYS = { + EVENT_TYPE: str, + EVENT_PATH: str +} + +JOB_KEYS = { + JOB_TYPE: str, + JOB_EVENT: dict, + JOB_ID: str, + JOB_PATTERN: Any, + JOB_RECIPE: Any, + JOB_RULE: str, + JOB_STATUS: str, + JOB_CREATE_TIME: datetime, +} def check_type(variable:Any, expected_type:type, alt_types:list[type]=[], or_none:bool=False)->None: @@ -42,12 +60,18 @@ def check_type(variable:Any, expected_type:type, alt_types:list[type]=[], return if not isinstance(variable, tuple(type_list)): + print("egh") + raise TypeError( 'Expected type(s) are %s, got %s' % (get_args(expected_type), type(variable)) ) def check_implementation(child_func, parent_class): + if not hasattr(parent_class, child_func.__name__): + raise AttributeError( + f"Parent class {parent_class} does not implement base function " + f"{child_func.__name__} for children to override.") parent_func = getattr(parent_class, child_func.__name__) if (child_func == parent_func): msg = get_not_imp_msg(parent_class, parent_func) @@ -180,9 +204,15 @@ def setup_debugging(print:Any=None, logging:int=0)->Tuple[Any,int]: return print, logging -def valid_event(event)->None: - check_type(event, dict) - if not EVENT_TYPE in event.keys(): - raise KeyError(f"Events require key '{EVENT_TYPE}'") - if not EVENT_PATH in event.keys(): - raise KeyError(f"Events require key '{EVENT_PATH}'") +def valid_meow_dict(meow_dict:dict[str,Any], msg:str, keys:dict[str,type])->None: + check_type(meow_dict, dict) + for key, value_type in keys.items(): + if not key in meow_dict.keys(): + raise KeyError(f"{msg} require key '{key}'") + check_type(meow_dict[key], value_type) + +def valid_event(event:dict[str,Any])->None: + valid_meow_dict(event, "Event", EVENT_KEYS) + +def valid_job(job:dict[str,Any])->None: + valid_meow_dict(job, "Job", JOB_KEYS) diff --git a/core/correctness/vars.py b/core/correctness/vars.py index 81ad52c..647780b 100644 --- a/core/correctness/vars.py +++ b/core/correctness/vars.py @@ -188,11 +188,12 @@ APPENDING_NOTEBOOK = { } # meow events -EVENT_TYPE = "meow_event_type" +EVENT_TYPE = "event_type" EVENT_PATH = "event_path" WATCHDOG_TYPE = "watchdog" WATCHDOG_BASE = "monitor_base" WATCHDOG_RULE = "rule_name" +WATCHDOG_HASH = "file_hash" # inotify events FILE_CREATE_EVENT = "file_created" @@ -223,6 +224,42 @@ DIR_EVENTS = [ DIR_RETROACTIVE_EVENT ] +# meow jobs +JOB_TYPE = "job_type" +PYTHON_TYPE = "python" +PYTHON_FUNC = "func" +PYTHON_EXECUTION_BASE = "exection_base" +PYTHON_OUTPUT_DIR = "output_dir" + +# job definitions +JOB_ID = "id" +JOB_EVENT = "event" +JOB_PATTERN = "pattern" +JOB_RECIPE = "recipe" +JOB_RULE = "rule" +JOB_HASH = "hash" +JOB_STATUS = "status" +JOB_CREATE_TIME = "create" +JOB_START_TIME = "start" +JOB_END_TIME = "end" +JOB_ERROR = "error" +JOB_REQUIREMENTS = "requirements" +JOB_PARAMETERS = "parameters" + +# job statuses +STATUS_QUEUED = "queued" +STATUS_RUNNING = "running" +STATUS_SKIPPED = "skipped" +STATUS_FAILED = "failed" +STATUS_DONE = "done" + +# job definition files +META_FILE = "job.yml" +BASE_FILE = "base.ipynb" +PARAMS_FILE = "params.yml" +JOB_FILE = "job.ipynb" +RESULT_FILE = "result.ipynb" + # debug printing levels DEBUG_ERROR = 1 DEBUG_WARNING = 2 diff --git a/core/functionality.py b/core/functionality.py index f062c46..f3627f4 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -6,6 +6,8 @@ import nbformat import os import yaml +from datetime import datetime + from multiprocessing.connection import Connection, wait as multi_wait from multiprocessing.queues import Queue from papermill.translators import papermill_translators @@ -16,8 +18,23 @@ from core.correctness.validation import check_type, valid_existing_file_path, \ valid_path from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ VALID_CHANNELS, HASH_BUFFER_SIZE, SHA256, DEBUG_WARNING, DEBUG_INFO, \ - EVENT_TYPE, EVENT_PATH + EVENT_TYPE, EVENT_PATH, JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, \ + JOB_RECIPE, JOB_RULE, WATCHDOG_RULE, JOB_STATUS, STATUS_QUEUED, \ + JOB_CREATE_TIME, JOB_REQUIREMENTS +# mig trigger keyword replacements +KEYWORD_PATH = "{PATH}" +KEYWORD_REL_PATH = "{REL_PATH}" +KEYWORD_DIR = "{DIR}" +KEYWORD_REL_DIR = "{REL_DIR}" +KEYWORD_FILENAME = "{FILENAME}" +KEYWORD_PREFIX = "{PREFIX}" +KEYWORD_BASE = "{VGRID}" +KEYWORD_EXTENSION = "{EXTENSION}" +KEYWORD_JOB = "{JOB}" + + +#TODO Make this guaranteed unique def generate_id(prefix:str="", length:int=16, existing_ids:list[str]=[], charset:str=CHAR_UPPERCASE+CHAR_LOWERCASE, attempts:int=24): random_length = max(length - len(prefix), 0) @@ -100,19 +117,16 @@ def make_dir(path:str, can_exist:bool=True, ensure_clean:bool=False): :return: No return """ - if not os.path.exists(path): - os.mkdir(path) - elif os.path.isfile(path): - raise ValueError('Cannot make directory in %s as it already ' - 'exists and is a file' % path) - else: - if not can_exist: - if ensure_clean: - rmtree(path) - os.mkdir(path) - else: - raise ValueError("Directory %s already exists. " % path) - + if os.path.exists(path): + if os.path.isfile(path): + raise ValueError( + f"Cannot make directory in {path} as it already exists and is " + "a file") + if ensure_clean: + rmtree(path) + + os.makedirs(path, exist_ok=can_exist) + def read_yaml(filepath:str): """ Reads a file path as a yaml object. @@ -124,7 +138,7 @@ def read_yaml(filepath:str): with open(filepath, 'r') as yaml_file: return yaml.load(yaml_file, Loader=yaml.Loader) -def write_yaml(source:Any, filename:str, mode:str='w'): +def write_yaml(source:Any, filename:str): """ Writes a given objcet to a yaml file. @@ -134,7 +148,7 @@ def write_yaml(source:Any, filename:str, mode:str='w'): :return: No return """ - with open(filename, mode) as param_file: + with open(filename, 'w') as param_file: yaml.dump(source, param_file, default_flow_style=False) def read_notebook(filepath:str): @@ -241,6 +255,51 @@ def print_debug(print_target, debug_level, msg, level)->None: status = "WARNING" print(f"{status}: {msg}", file=print_target) -def create_event(event_type:str, path:str, source:dict[Any,Any]={})->dict[Any,Any]: +def replace_keywords(old_dict:dict[str,str], job_id:str, src_path:str, + monitor_base:str)->dict[str,str]: + new_dict = {} + + filename = os.path.basename(src_path) + dirname = os.path.dirname(src_path) + relpath = os.path.relpath(src_path, monitor_base) + reldirname = os.path.dirname(relpath) + (prefix, extension) = os.path.splitext(filename) + + for var, val in old_dict.items(): + if isinstance(val, str): + val = val.replace(KEYWORD_PATH, src_path) + val = val.replace(KEYWORD_REL_PATH, relpath) + val = val.replace(KEYWORD_DIR, dirname) + val = val.replace(KEYWORD_REL_DIR, reldirname) + val = val.replace(KEYWORD_FILENAME, filename) + val = val.replace(KEYWORD_PREFIX, prefix) + val = val.replace(KEYWORD_BASE, monitor_base) + val = val.replace(KEYWORD_EXTENSION, extension) + val = val.replace(KEYWORD_JOB, job_id) + + new_dict[var] = val + else: + new_dict[var] = val + + return new_dict + +def create_event(event_type:str, path:str, source:dict[Any,Any]={} + )->dict[Any,Any]: return {**source, EVENT_PATH: path, EVENT_TYPE: event_type} +def create_job(job_type:str, event:dict[str,Any], source:dict[Any,Any]={} + )->dict[Any,Any]: + job_dict = { + #TODO compress pattern, recipe, rule? + JOB_ID: generate_id(prefix="job_"), + JOB_EVENT: event, + JOB_TYPE: job_type, + JOB_PATTERN: event[WATCHDOG_RULE].pattern, + JOB_RECIPE: event[WATCHDOG_RULE].recipe, + JOB_RULE: event[WATCHDOG_RULE].name, + JOB_STATUS: STATUS_QUEUED, + JOB_CREATE_TIME: datetime.now(), + JOB_REQUIREMENTS: event[WATCHDOG_RULE].recipe.requirements + } + + return {**source, **job_dict} diff --git a/core/meow.py b/core/meow.py index 8c4f629..a11c6cb 100644 --- a/core/meow.py +++ b/core/meow.py @@ -2,6 +2,7 @@ import inspect import sys +from copy import deepcopy from typing import Any, Union from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ @@ -150,8 +151,8 @@ class BaseMonitor: check_implementation(type(self).remove_recipe, BaseMonitor) check_implementation(type(self).get_recipes, BaseMonitor) check_implementation(type(self).get_rules, BaseMonitor) - self._patterns = patterns - self._recipes = recipes + self._patterns = deepcopy(patterns) + self._recipes = deepcopy(recipes) self._rules = create_rules(patterns, recipes) def __new__(cls, *args, **kwargs): @@ -201,7 +202,8 @@ class BaseMonitor: class BaseHandler: - def __init__(self) -> None: + to_runner: VALID_CHANNELS + def __init__(self)->None: check_implementation(type(self).handle, BaseHandler) check_implementation(type(self).valid_event_types, BaseHandler) @@ -214,9 +216,22 @@ class BaseHandler: def valid_event_types(self)->list[str]: pass - def handle(self, event:Any)->None: + def handle(self, event:dict[str,Any])->None: pass + +class BaseConductor: + def __init__(self)->None: + check_implementation(type(self).execute, BaseConductor) + check_implementation(type(self).valid_job_types, BaseConductor) + + def valid_job_types(self)->list[str]: + pass + + def execute(self, job:dict[str,Any])->None: + pass + + def create_rules(patterns:Union[dict[str,BasePattern],list[BasePattern]], recipes:Union[dict[str,BaseRecipe],list[BaseRecipe]], new_rules:list[BaseRule]=[])->dict[str,BaseRule]: diff --git a/core/runner.py b/core/runner.py index 905bcb3..0815997 100644 --- a/core/runner.py +++ b/core/runner.py @@ -2,36 +2,69 @@ import sys import threading +from inspect import signature from multiprocessing import Pipe from random import randrange from typing import Any, Union from core.correctness.vars import DEBUG_WARNING, DEBUG_INFO, EVENT_TYPE, \ - VALID_CHANNELS + VALID_CHANNELS, JOB_TYPE, JOB_ID from core.correctness.validation import setup_debugging, check_type, \ valid_list from core.functionality import print_debug, wait -from core.meow import BaseHandler, BaseMonitor +from core.meow import BaseHandler, BaseMonitor, BaseConductor class MeowRunner: monitors:list[BaseMonitor] handlers:dict[str:BaseHandler] - from_monitor: list[VALID_CHANNELS] + conductors:dict[str:BaseConductor] + from_monitors: list[VALID_CHANNELS] + from_handlers: list[VALID_CHANNELS] def __init__(self, monitors:Union[BaseMonitor,list[BaseMonitor]], handlers:Union[BaseHandler,list[BaseHandler]], - print:Any=sys.stdout, logging:int=0) -> None: + conductors:Union[BaseConductor,list[BaseConductor]], + print:Any=sys.stdout, logging:int=0)->None: + + self._is_valid_conductors(conductors) + if not type(conductors) == list: + conductors = [conductors] + self.conductors = {} + for conductor in conductors: + conductor_jobs = conductor.valid_job_types() + if not conductor_jobs: + raise ValueError( + "Cannot start runner with conductor that does not " + f"implement '{BaseConductor.valid_job_types.__name__}" + f"({signature(BaseConductor.valid_job_types)})' and " + "return a list of at least one conductable job.") + for job in conductor_jobs: + if job in self.conductors.keys(): + self.conductors[job].append(conductor) + else: + self.conductors[job] = [conductor] + self._is_valid_handlers(handlers) if not type(handlers) == list: handlers = [handlers] self.handlers = {} + self.from_handlers = [] for handler in handlers: handler_events = handler.valid_event_types() + if not handler_events: + raise ValueError( + "Cannot start runner with handler that does not " + f"implement '{BaseHandler.valid_event_types.__name__}" + f"({signature(BaseHandler.valid_event_types)})' and " + "return a list of at least one handlable event.") for event in handler_events: if event in self.handlers.keys(): self.handlers[event].append(handler) else: self.handlers[event] = [handler] + handler_to_runner_reader, handler_to_runner_writer = Pipe() + handler.to_runner = handler_to_runner_writer + self.from_handlers.append(handler_to_runner_reader) self._is_valid_monitors(monitors) if not type(monitors) == list: @@ -43,16 +76,20 @@ class MeowRunner: monitor.to_runner = monitor_to_runner_writer self.from_monitors.append(monitor_to_runner_reader) - self._stop_pipe = Pipe() - self._worker = None + self._stop_mon_han_pipe = Pipe() + self._mon_han_worker = None + + self._stop_han_con_pipe = Pipe() + self._han_con_worker = None + self._print_target, self.debug_level = setup_debugging(print, logging) - def run(self)->None: - all_inputs = self.from_monitors + [self._stop_pipe[0]] + def run_monitor_handler_interaction(self)->None: + all_inputs = self.from_monitors + [self._stop_mon_han_pipe[0]] while True: ready = wait(all_inputs) - if self._stop_pipe[0] in ready: + if self._stop_mon_han_pipe[0] in ready: return else: for from_monitor in self.from_monitors: @@ -62,7 +99,8 @@ class MeowRunner: if not self.handlers[event[EVENT_TYPE]]: print_debug(self._print_target, self.debug_level, "Could not process event as no relevent " - f"handler for '{EVENT_TYPE}'", DEBUG_INFO) + f"handler for '{event[EVENT_TYPE]}'", + DEBUG_INFO) return if len(self.handlers[event[EVENT_TYPE]]) == 1: self.handlers[event[EVENT_TYPE]][0].handle(event) @@ -71,6 +109,44 @@ class MeowRunner: randrange(len(self.handlers[event[EVENT_TYPE]])) ].handle(event) + def run_handler_conductor_interaction(self)->None: + all_inputs = self.from_handlers + [self._stop_han_con_pipe[0]] + while True: + ready = wait(all_inputs) + + if self._stop_han_con_pipe[0] in ready: + return + else: + for from_handler in self.from_handlers: + if from_handler in ready: + message = from_handler.recv() + job = message + if not self.conductors[job[JOB_TYPE]]: + print_debug(self._print_target, self.debug_level, + "Could not process job as no relevent " + f"conductor for '{job[JOB_TYPE]}'", DEBUG_INFO) + return + if len(self.conductors[job[JOB_TYPE]]) == 1: + conductor = self.conductors[job[JOB_TYPE]][0] + self.execute_job(conductor, job) + else: + conductor = self.conductors[job[JOB_TYPE]][ + randrange(len(self.conductors[job[JOB_TYPE]])) + ] + self.execute_job(conductor, job) + + def execute_job(self, conductor:BaseConductor, job:dict[str:Any])->None: + print_debug(self._print_target, self.debug_level, + f"Starting execution for job: '{job[JOB_ID]}'", DEBUG_INFO) + try: + conductor.execute(job) + print_debug(self._print_target, self.debug_level, + f"Completed execution for job: '{job[JOB_ID]}'", DEBUG_INFO) + except Exception as e: + print_debug(self._print_target, self.debug_level, + "Something went wrong during execution for job " + f"'{job[JOB_ID]}'. {e}", DEBUG_INFO) + def start(self)->None: for monitor in self.monitors: monitor.start() @@ -79,23 +155,42 @@ class MeowRunner: for handler in handler_list: if hasattr(handler, "start") and handler not in startable: startable.append() - for handler in startable: - handler.start() - - if self._worker is None: - self._worker = threading.Thread( - target=self.run, + for conductor_list in self.conductors.values(): + for conductor in conductor_list: + if hasattr(conductor, "start") and conductor not in startable: + startable.append() + for starting in startable: + starting.start() + + if self._mon_han_worker is None: + self._mon_han_worker = threading.Thread( + target=self.run_monitor_handler_interaction, args=[]) - self._worker.daemon = True - self._worker.start() + self._mon_han_worker.daemon = True + self._mon_han_worker.start() print_debug(self._print_target, self.debug_level, - "Starting MeowRunner run...", DEBUG_INFO) + "Starting MeowRunner event handling...", DEBUG_INFO) else: - msg = "Repeated calls to start have no effect." + msg = "Repeated calls to start MeowRunner event handling have " \ + "no effect." print_debug(self._print_target, self.debug_level, msg, DEBUG_WARNING) raise RuntimeWarning(msg) + if self._han_con_worker is None: + self._han_con_worker = threading.Thread( + target=self.run_handler_conductor_interaction, + args=[]) + self._han_con_worker.daemon = True + self._han_con_worker.start() + print_debug(self._print_target, self.debug_level, + "Starting MeowRunner job conducting...", DEBUG_INFO) + else: + msg = "Repeated calls to start MeowRunner job conducting have " \ + "no effect." + print_debug(self._print_target, self.debug_level, + msg, DEBUG_WARNING) + raise RuntimeWarning(msg) def stop(self)->None: for monitor in self.monitors: @@ -106,29 +201,49 @@ class MeowRunner: for handler in handler_list: if hasattr(handler, "stop") and handler not in stopable: stopable.append() - for handler in stopable: - handler.stop() + for conductor_list in self.conductors.values(): + for conductor in conductor_list: + if hasattr(conductor, "stop") and conductor not in stopable: + stopable.append() + for stopping in stopable: + stopping.stop() - if self._worker is None: - msg = "Cannot stop thread that is not started." + if self._mon_han_worker is None: + msg = "Cannot stop event handling thread that is not started." print_debug(self._print_target, self.debug_level, msg, DEBUG_WARNING) raise RuntimeWarning(msg) else: - self._stop_pipe[1].send(1) - self._worker.join() + self._stop_mon_han_pipe[1].send(1) + self._mon_han_worker.join() print_debug(self._print_target, self.debug_level, - "Worker thread stopped", DEBUG_INFO) + "Event handler thread stopped", DEBUG_INFO) + if self._han_con_worker is None: + msg = "Cannot stop job conducting thread that is not started." + print_debug(self._print_target, self.debug_level, + msg, DEBUG_WARNING) + raise RuntimeWarning(msg) + else: + self._stop_han_con_pipe[1].send(1) + self._han_con_worker.join() + print_debug(self._print_target, self.debug_level, + "Job conductor thread stopped", DEBUG_INFO) def _is_valid_monitors(self, monitors:Union[BaseMonitor,list[BaseMonitor]])->None: - check_type(monitors, BaseMonitor, alt_types=[list[BaseMonitor]]) + check_type(monitors, BaseMonitor, alt_types=[list]) if type(monitors) == list: valid_list(monitors, BaseMonitor, min_length=1) def _is_valid_handlers(self, handlers:Union[BaseHandler,list[BaseHandler]])->None: - check_type(handlers, BaseHandler, alt_types=[list[BaseHandler]]) + check_type(handlers, BaseHandler, alt_types=[list]) if type(handlers) == list: valid_list(handlers, BaseHandler, min_length=1) + + def _is_valid_conductors(self, + conductors:Union[BaseConductor,list[BaseConductor]])->None: + check_type(conductors, BaseConductor, alt_types=[list]) + if type(conductors) == list: + valid_list(conductors, BaseConductor, min_length=1) diff --git a/patterns/file_event_pattern.py b/patterns/file_event_pattern.py index e9f83b1..26e2d05 100644 --- a/patterns/file_event_pattern.py +++ b/patterns/file_event_pattern.py @@ -18,8 +18,8 @@ from core.correctness.validation import check_type, valid_string, \ from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ VALID_VARIABLE_NAME_CHARS, FILE_EVENTS, FILE_CREATE_EVENT, \ FILE_MODIFY_EVENT, FILE_MOVED_EVENT, DEBUG_INFO, WATCHDOG_TYPE, \ - WATCHDOG_RULE, WATCHDOG_BASE, FILE_RETROACTIVE_EVENT, EVENT_PATH -from core.functionality import print_debug, create_event + WATCHDOG_RULE, WATCHDOG_BASE, FILE_RETROACTIVE_EVENT, WATCHDOG_HASH, SHA256 +from core.functionality import print_debug, create_event, get_file_hash from core.meow import BasePattern, BaseMonitor, BaseRule, BaseRecipe, \ create_rule @@ -191,7 +191,14 @@ class WatchdogMonitor(BaseMonitor): meow_event = create_event( WATCHDOG_TYPE, event.src_path, - { WATCHDOG_BASE: self.base_dir, WATCHDOG_RULE: rule } + { + WATCHDOG_BASE: self.base_dir, + WATCHDOG_RULE: rule, + WATCHDOG_HASH: get_file_hash( + event.src_path, + SHA256 + ) + } ) print_debug(self._print_target, self.debug_level, f"Event at {src_path} of type {event_type} hit rule " @@ -200,34 +207,35 @@ class WatchdogMonitor(BaseMonitor): except Exception as e: self._rules_lock.release() - raise Exception(e) + raise e self._rules_lock.release() - def add_pattern(self, pattern: FileEventPattern) -> None: + def add_pattern(self, pattern:FileEventPattern)->None: check_type(pattern, FileEventPattern) self._patterns_lock.acquire() try: if pattern.name in self._patterns: - raise KeyError(f"An entry for Pattern '{pattern.name}' already " - "exists. Do you intend to update instead?") + raise KeyError(f"An entry for Pattern '{pattern.name}' " + "already exists. Do you intend to update instead?") self._patterns[pattern.name] = pattern except Exception as e: self._patterns_lock.release() - raise Exception(e) + raise e self._patterns_lock.release() self._identify_new_rules(new_pattern=pattern) - def update_pattern(self, pattern: FileEventPattern) -> None: + def update_pattern(self, pattern:FileEventPattern)->None: check_type(pattern, FileEventPattern) self.remove_pattern(pattern.name) + print(f"adding pattern w/ recipe {pattern.recipe}") self.add_pattern(pattern) - def remove_pattern(self, pattern: Union[str, FileEventPattern]) -> None: + def remove_pattern(self, pattern: Union[str,FileEventPattern])->None: check_type(pattern, str, alt_types=[FileEventPattern]) lookup_key = pattern - if type(lookup_key) is FileEventPattern: + if isinstance(lookup_key, FileEventPattern): lookup_key = pattern.name self._patterns_lock.acquire() try: @@ -237,23 +245,26 @@ class WatchdogMonitor(BaseMonitor): self._patterns.pop(lookup_key) except Exception as e: self._patterns_lock.release() - raise Exception(e) + raise e self._patterns_lock.release() - self._identify_lost_rules(lost_pattern=pattern.name) + if isinstance(pattern, FileEventPattern): + self._identify_lost_rules(lost_pattern=pattern.name) + else: + self._identify_lost_rules(lost_pattern=pattern) - def get_patterns(self) -> None: + def get_patterns(self)->None: to_return = {} self._patterns_lock.acquire() try: to_return = deepcopy(self._patterns) except Exception as e: self._patterns_lock.release() - raise Exception(e) + raise e self._patterns_lock.release() return to_return - def add_recipe(self, recipe: BaseRecipe) -> None: + def add_recipe(self, recipe: BaseRecipe)->None: check_type(recipe, BaseRecipe) self._recipes_lock.acquire() try: @@ -263,20 +274,20 @@ class WatchdogMonitor(BaseMonitor): self._recipes[recipe.name] = recipe except Exception as e: self._recipes_lock.release() - raise Exception(e) + raise e self._recipes_lock.release() self._identify_new_rules(new_recipe=recipe) - def update_recipe(self, recipe: BaseRecipe) -> None: + def update_recipe(self, recipe: BaseRecipe)->None: check_type(recipe, BaseRecipe) self.remove_recipe(recipe.name) self.add_recipe(recipe) - def remove_recipe(self, recipe: Union[str, BaseRecipe]) -> None: + def remove_recipe(self, recipe:Union[str,BaseRecipe])->None: check_type(recipe, str, alt_types=[BaseRecipe]) lookup_key = recipe - if type(lookup_key) is BaseRecipe: + if isinstance(lookup_key, BaseRecipe): lookup_key = recipe.name self._recipes_lock.acquire() try: @@ -286,30 +297,33 @@ class WatchdogMonitor(BaseMonitor): self._recipes.pop(lookup_key) except Exception as e: self._recipes_lock.release() - raise Exception(e) + raise e self._recipes_lock.release() - self._identify_lost_rules(lost_recipe=recipe.name) + if isinstance(recipe, BaseRecipe): + self._identify_lost_rules(lost_recipe=recipe.name) + else: + self._identify_lost_rules(lost_recipe=recipe) - def get_recipes(self) -> None: + def get_recipes(self)->None: to_return = {} self._recipes_lock.acquire() try: to_return = deepcopy(self._recipes) except Exception as e: self._recipes_lock.release() - raise Exception(e) + raise e self._recipes_lock.release() return to_return - def get_rules(self) -> None: + def get_rules(self)->None: to_return = {} self._rules_lock.acquire() try: to_return = deepcopy(self._rules) except Exception as e: self._rules_lock.release() - raise Exception(e) + raise e self._rules_lock.release() return to_return @@ -332,13 +346,13 @@ class WatchdogMonitor(BaseMonitor): except Exception as e: self._patterns_lock.release() self._recipes_lock.release() - raise Exception(e) + raise e self._patterns_lock.release() self._recipes_lock.release() if new_recipe: self._patterns_lock.acquire() - self._patterns_lock.acquire() + self._recipes_lock.acquire() try: if new_recipe.name not in self._recipes: self._patterns_lock.release() @@ -353,11 +367,12 @@ class WatchdogMonitor(BaseMonitor): except Exception as e: self._patterns_lock.release() self._recipes_lock.release() - raise Exception(e) + raise e self._patterns_lock.release() self._recipes_lock.release() - def _identify_lost_rules(self, lost_pattern:str, lost_recipe:str)->None: + def _identify_lost_rules(self, lost_pattern:str=None, + lost_recipe:str=None)->None: to_delete = [] self._rules_lock.acquire() try: @@ -371,7 +386,7 @@ class WatchdogMonitor(BaseMonitor): self._rules.pop(delete) except Exception as e: self._rules_lock.release() - raise Exception(e) + raise e self._rules_lock.release() def _create_new_rule(self, pattern:FileEventPattern, recipe:BaseRecipe)->None: @@ -384,7 +399,7 @@ class WatchdogMonitor(BaseMonitor): self._rules[rule.name] = rule except Exception as e: self._rules_lock.release() - raise Exception(e) + raise e self._rules_lock.release() self._apply_retroactive_rule(rule) @@ -410,7 +425,8 @@ class WatchdogMonitor(BaseMonitor): return if FILE_RETROACTIVE_EVENT in rule.pattern.event_mask: - testing_path = os.path.join(self.base_dir, rule.pattern.triggering_path) + testing_path = os.path.join( + self.base_dir, rule.pattern.triggering_path) globbed = glob.glob(testing_path) @@ -428,9 +444,10 @@ class WatchdogMonitor(BaseMonitor): except Exception as e: self._rules_lock.release() - raise Exception(e) + raise e self._rules_lock.release() + class WatchdogEventHandler(PatternMatchingEventHandler): monitor:WatchdogMonitor _settletime:int diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index 6ed53ec..dd9090c 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -1,68 +1,22 @@ -import copy import nbformat -import os -import papermill -import shutil import sys import threading -from datetime import datetime from multiprocessing import Pipe -from time import sleep from typing import Any -from watchdog.events import FileSystemEvent from core.correctness.validation import check_type, valid_string, \ valid_dict, valid_path, valid_list, valid_existing_dir_path, \ setup_debugging from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, VALID_CHANNELS, \ - SHA256, DEBUG_ERROR, DEBUG_WARNING, DEBUG_INFO, WATCHDOG_TYPE, \ - WATCHDOG_BASE, WATCHDOG_RULE, EVENT_PATH -from core.functionality import wait, get_file_hash, generate_id, make_dir, \ - write_yaml, write_notebook, get_file_hash, parameterize_jupyter_notebook, \ - print_debug -from core.meow import BaseRecipe, BaseHandler, BaseRule + PYTHON_FUNC, DEBUG_INFO, WATCHDOG_TYPE, JOB_HASH, PYTHON_EXECUTION_BASE, \ + WATCHDOG_RULE, EVENT_PATH, PYTHON_TYPE, WATCHDOG_HASH, JOB_PARAMETERS, \ + PYTHON_OUTPUT_DIR +from core.functionality import print_debug, create_job, replace_keywords +from core.meow import BaseRecipe, BaseHandler from patterns.file_event_pattern import SWEEP_START, SWEEP_STOP, SWEEP_JUMP -# mig trigger keyword replacements -KEYWORD_PATH = "{PATH}" -KEYWORD_REL_PATH = "{REL_PATH}" -KEYWORD_DIR = "{DIR}" -KEYWORD_REL_DIR = "{REL_DIR}" -KEYWORD_FILENAME = "{FILENAME}" -KEYWORD_PREFIX = "{PREFIX}" -KEYWORD_BASE = "{VGRID}" -KEYWORD_EXTENSION = "{EXTENSION}" -KEYWORD_JOB = "{JOB}" - -# job definitions -JOB_ID = 'id' -JOB_PATTERN = 'pattern' -JOB_RECIPE = 'recipe' -JOB_RULE = 'rule' -JOB_PATH = 'path' -JOB_HASH = 'hash' -JOB_STATUS = 'status' -JOB_CREATE_TIME = 'create' -JOB_START_TIME = 'start' -JOB_END_TIME = 'end' -JOB_ERROR = 'error' -JOB_REQUIREMENTS = 'requirements' - -# job statuses -STATUS_QUEUED = 'queued' -STATUS_RUNNING = 'running' -STATUS_SKIPPED = 'skipped' -STATUS_FAILED = 'failed' -STATUS_DONE = 'done' - -# job definition files -META_FILE = 'job.yml' -BASE_FILE = 'base.ipynb' -PARAMS_FILE = 'params.yml' -JOB_FILE = 'job.ipynb' -RESULT_FILE = 'result.ipynb' class JupyterNotebookRecipe(BaseRecipe): source:str @@ -96,8 +50,6 @@ class PapermillHandler(BaseHandler): debug_level:int _worker:threading.Thread _stop_pipe:Pipe - _jobs:list[str] - _jobs_lock:threading.Lock _print_target:Any def __init__(self, handler_base:str, output_dir:str, print:Any=sys.stdout, logging:int=0)->None: @@ -106,21 +58,16 @@ class PapermillHandler(BaseHandler): self.handler_base = handler_base self._is_valid_output_dir(output_dir) self.output_dir = output_dir - self._print_target, self.debug_level = setup_debugging(print, logging) + self._print_target, self.debug_level = setup_debugging(print, logging) self._worker = None self._stop_pipe = Pipe() - self._jobs = [] - self._jobs_lock = threading.Lock() print_debug(self._print_target, self.debug_level, "Created new PapermillHandler instance", DEBUG_INFO) - def handle(self, event:dict[Any,Any])->None: - # TODO finish implementation and test - + def handle(self, event:dict[str,Any])->None: print_debug(self._print_target, self.debug_level, f"Handling event {event[EVENT_PATH]}", DEBUG_INFO) - file_hash = get_file_hash(event[EVENT_PATH], SHA256) rule = event[WATCHDOG_RULE] yaml_dict = {} @@ -131,17 +78,7 @@ class PapermillHandler(BaseHandler): yaml_dict[rule.pattern.triggering_file] = event[EVENT_PATH] if not rule.pattern.sweep: - waiting_for_threaded_resources = True - while waiting_for_threaded_resources: - try: - worker = threading.Thread( - target=self.execute_job, - args=[event, yaml_dict, file_hash]) - worker.daemon = True - worker.start() - waiting_for_threaded_resources = False - except threading.ThreadError: - sleep(1) + self.setup_job(event, yaml_dict) else: for var, val in rule.pattern.sweep.items(): values = [] @@ -152,36 +89,7 @@ class PapermillHandler(BaseHandler): for value in values: yaml_dict[var] = value - waiting_for_threaded_resources = True - while waiting_for_threaded_resources: - try: - worker = threading.Thread( - target=self.execute_job, - args=[event, yaml_dict, file_hash]) - worker.daemon = True - worker.start() - waiting_for_threaded_resources = False - except threading.ThreadError: - sleep(1) - - def add_job(self, job): - self._jobs_lock.acquire() - try: - self._jobs.append(job) - except Exception as e: - self._jobs_lock.release() - raise e - self._jobs_lock.release() - - def get_jobs(self): - self._jobs_lock.acquire() - try: - jobs_deepcopy = copy.deepcopy(self._jobs) - except Exception as e: - self._jobs_lock.release() - raise e - self._jobs_lock.release() - return jobs_deepcopy + self.setup_job(event, yaml_dict) def valid_event_types(self)->list[str]: return [WATCHDOG_TYPE] @@ -195,135 +103,102 @@ class PapermillHandler(BaseHandler): def _is_valid_output_dir(self, output_dir)->None: valid_existing_dir_path(output_dir, allow_base=True) - def execute_job(self, event:FileSystemEvent, - yaml_dict:dict[str,Any], triggerfile_hash:str)->None: + def setup_job(self, event:dict[str,Any], yaml_dict:dict[str,Any])->None: + meow_job = create_job(PYTHON_TYPE, event, { + JOB_PARAMETERS:yaml_dict, + JOB_HASH: event[WATCHDOG_HASH], + PYTHON_FUNC:job_func, + PYTHON_OUTPUT_DIR:self.output_dir, + PYTHON_EXECUTION_BASE:self.handler_base,}) + print_debug(self._print_target, self.debug_level, + f"Creating job from event at {event[EVENT_PATH]} of type " + f"{PYTHON_TYPE}.", DEBUG_INFO) + self.to_runner.send(meow_job) - job_dict = { - JOB_ID: generate_id(prefix="job_", existing_ids=self.get_jobs()), - JOB_PATTERN: event[WATCHDOG_RULE].pattern, - JOB_RECIPE: event[WATCHDOG_RULE].recipe, - JOB_RULE: event[WATCHDOG_RULE].name, - JOB_PATH: event[EVENT_PATH], - JOB_HASH: triggerfile_hash, - JOB_STATUS: STATUS_QUEUED, - JOB_CREATE_TIME: datetime.now(), - JOB_REQUIREMENTS: event[WATCHDOG_RULE].recipe.requirements - } +def job_func(job): + import os + import shutil + import papermill + from datetime import datetime + from core.functionality import make_dir, write_yaml, \ + write_notebook, get_file_hash, parameterize_jupyter_notebook + from core.correctness.vars import JOB_EVENT, WATCHDOG_RULE, \ + JOB_ID, EVENT_PATH, WATCHDOG_BASE, META_FILE, \ + BASE_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE, JOB_STATUS, \ + JOB_START_TIME, STATUS_RUNNING, JOB_HASH, SHA256, \ + STATUS_SKIPPED, STATUS_DONE, JOB_END_TIME, \ + JOB_ERROR, STATUS_FAILED, PYTHON_EXECUTION_BASE, PYTHON_OUTPUT_DIR - print_debug(self._print_target, self.debug_level, - f"Creating job for event at {event[EVENT_PATH]} with ID " - f"{job_dict[JOB_ID]}", DEBUG_INFO) + event = job[JOB_EVENT] - self.add_job(job_dict[JOB_ID]) + yaml_dict = replace_keywords( + job[JOB_PARAMETERS], + job[JOB_ID], + event[EVENT_PATH], + event[WATCHDOG_BASE] + ) - yaml_dict = self.replace_keywords( - yaml_dict, - job_dict[JOB_ID], - event[EVENT_PATH], - event[WATCHDOG_BASE] + job_dir = os.path.join(job[PYTHON_EXECUTION_BASE], job[JOB_ID]) + make_dir(job_dir) + + meta_file = os.path.join(job_dir, META_FILE) + write_yaml(job, meta_file) + + base_file = os.path.join(job_dir, BASE_FILE) + write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file) + + param_file = os.path.join(job_dir, PARAMS_FILE) + write_yaml(yaml_dict, param_file) + + job_file = os.path.join(job_dir, JOB_FILE) + result_file = os.path.join(job_dir, RESULT_FILE) + + job[JOB_STATUS] = STATUS_RUNNING + job[JOB_START_TIME] = datetime.now() + + write_yaml(job, meta_file) + + if JOB_HASH in job: + triggerfile_hash = get_file_hash(job[JOB_EVENT][EVENT_PATH], SHA256) + if not triggerfile_hash \ + or triggerfile_hash != job[JOB_HASH]: + job[JOB_STATUS] = STATUS_SKIPPED + job[JOB_END_TIME] = datetime.now() + msg = "Job was skipped as triggering file " + \ + f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \ + "scheduling. Was expected to have hash " + \ + f"'{job[JOB_HASH]}' but has '{triggerfile_hash}'." + job[JOB_ERROR] = msg + write_yaml(job, meta_file) + return + + try: + job_notebook = parameterize_jupyter_notebook( + event[WATCHDOG_RULE].recipe.recipe, yaml_dict ) - - job_dir = os.path.join(self.handler_base, job_dict[JOB_ID]) - make_dir(job_dir) - - meta_file = os.path.join(job_dir, META_FILE) - write_yaml(job_dict, meta_file) - - base_file = os.path.join(job_dir, BASE_FILE) - write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file) - - param_file = os.path.join(job_dir, PARAMS_FILE) - write_yaml(yaml_dict, param_file) - - job_file = os.path.join(job_dir, JOB_FILE) - result_file = os.path.join(job_dir, RESULT_FILE) - - job_dict[JOB_STATUS] = STATUS_RUNNING - job_dict[JOB_START_TIME] = datetime.now() - - write_yaml(job_dict, meta_file) - - if JOB_HASH in job_dict: - triggerfile_hash = get_file_hash(job_dict[JOB_PATH], SHA256) - if not triggerfile_hash \ - or triggerfile_hash != job_dict[JOB_HASH]: - job_dict[JOB_STATUS] = STATUS_SKIPPED - job_dict[JOB_END_TIME] = datetime.now() - msg = "Job was skipped as triggering file " + \ - f"'{job_dict[JOB_PATH]}' has been modified since " + \ - "scheduling. Was expected to have hash " + \ - f"'{job_dict[JOB_HASH]}' but has '{triggerfile_hash}'." - job_dict[JOB_ERROR] = msg - write_yaml(job_dict, meta_file) - print_debug(self._print_target, self.debug_level, - msg, DEBUG_ERROR) - return - - try: - job_notebook = parameterize_jupyter_notebook( - event[WATCHDOG_RULE].recipe.recipe, yaml_dict - ) - write_notebook(job_notebook, job_file) - except Exception: - job_dict[JOB_STATUS] = STATUS_FAILED - job_dict[JOB_END_TIME] = datetime.now() - msg = f"Job file {job_dict[JOB_ID]} was not created successfully" - job_dict[JOB_ERROR] = msg - write_yaml(job_dict, meta_file) - print_debug(self._print_target, self.debug_level, - msg, DEBUG_ERROR) - return - - try: - papermill.execute_notebook(job_file, result_file, {}) - except Exception: - job_dict[JOB_STATUS] = STATUS_FAILED - job_dict[JOB_END_TIME] = datetime.now() - msg = 'Result file %s was not created successfully' - job_dict[JOB_ERROR] = msg - write_yaml(job_dict, meta_file) - print_debug(self._print_target, self.debug_level, - msg, DEBUG_ERROR) - return - - job_dict[JOB_STATUS] = STATUS_DONE - job_dict[JOB_END_TIME] = datetime.now() - write_yaml(job_dict, meta_file) - - job_output_dir = os.path.join(self.output_dir, job_dict[JOB_ID]) - - shutil.move(job_dir, job_output_dir) - - print_debug(self._print_target, self.debug_level, - f"Completed job {job_dict[JOB_ID]} with output at " - f"{job_output_dir}", DEBUG_INFO) - + write_notebook(job_notebook, job_file) + except Exception as e: + job[JOB_STATUS] = STATUS_FAILED + job[JOB_END_TIME] = datetime.now() + msg = f"Job file {job[JOB_ID]} was not created successfully. {e}" + job[JOB_ERROR] = msg + write_yaml(job, meta_file) return - def replace_keywords(self, old_dict:dict[str,str], job_id:str, - src_path:str, monitor_base:str)->dict[str,str]: - new_dict = {} + try: + papermill.execute_notebook(job_file, result_file, {}) + except Exception as e: + job[JOB_STATUS] = STATUS_FAILED + job[JOB_END_TIME] = datetime.now() + msg = f"Result file {result_file} was not created successfully. {e}" + job[JOB_ERROR] = msg + write_yaml(job, meta_file) + return - filename = os.path.basename(src_path) - dirname = os.path.dirname(src_path) - relpath = os.path.relpath(src_path, monitor_base) - reldirname = os.path.dirname(relpath) - (prefix, extension) = os.path.splitext(filename) + job[JOB_STATUS] = STATUS_DONE + job[JOB_END_TIME] = datetime.now() + write_yaml(job, meta_file) - for var, val in old_dict.items(): - if isinstance(val, str): - val = val.replace(KEYWORD_PATH, src_path) - val = val.replace(KEYWORD_REL_PATH, relpath) - val = val.replace(KEYWORD_DIR, dirname) - val = val.replace(KEYWORD_REL_DIR, reldirname) - val = val.replace(KEYWORD_FILENAME, filename) - val = val.replace(KEYWORD_PREFIX, prefix) - val = val.replace(KEYWORD_BASE, monitor_base) - val = val.replace(KEYWORD_EXTENSION, extension) - val = val.replace(KEYWORD_JOB, job_id) + job_output_dir = os.path.join(job[PYTHON_OUTPUT_DIR], job[JOB_ID]) - new_dict[var] = val - else: - new_dict[var] = val - - return new_dict + shutil.move(job_dir, job_output_dir) diff --git a/rules/file_event_jupyter_notebook_rule.py b/rules/file_event_jupyter_notebook_rule.py index fc5c405..4a9b729 100644 --- a/rules/file_event_jupyter_notebook_rule.py +++ b/rules/file_event_jupyter_notebook_rule.py @@ -15,8 +15,8 @@ class FileEventJupyterNotebookRule(BaseRule): f"{pattern.name} does not identify Recipe {recipe.name}. It " f"uses {pattern.recipe}") - def _is_valid_pattern(self, pattern:FileEventPattern) -> None: + def _is_valid_pattern(self, pattern:FileEventPattern)->None: check_type(pattern, FileEventPattern) - def _is_valid_recipe(self, recipe:JupyterNotebookRecipe) -> None: + def _is_valid_recipe(self, recipe:JupyterNotebookRecipe)->None: check_type(recipe, JupyterNotebookRecipe) diff --git a/tests/test_conductors.py b/tests/test_conductors.py new file mode 100644 index 0000000..e387a22 --- /dev/null +++ b/tests/test_conductors.py @@ -0,0 +1,244 @@ + +import os +import unittest + +from core.correctness.vars import PYTHON_TYPE, TEST_HANDLER_BASE, SHA256, \ + TEST_JOB_OUTPUT, TEST_MONITOR_BASE, APPENDING_NOTEBOOK, WATCHDOG_TYPE, \ + WATCHDOG_BASE, WATCHDOG_RULE, WATCHDOG_HASH, JOB_PARAMETERS, JOB_HASH, \ + PYTHON_FUNC, PYTHON_OUTPUT_DIR, PYTHON_EXECUTION_BASE, JOB_ID, META_FILE, \ + BASE_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE +from core.functionality import make_dir, rmtree, get_file_hash, create_event, \ + create_job +from core.meow import create_rule +from conductors import LocalPythonConductor +from patterns import FileEventPattern +from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, job_func + + +def failing_func(): + raise Exception("bad function") + + +class MeowTests(unittest.TestCase): + def setUp(self)->None: + super().setUp() + make_dir(TEST_MONITOR_BASE) + make_dir(TEST_HANDLER_BASE) + make_dir(TEST_JOB_OUTPUT) + + def tearDown(self)->None: + super().tearDown() + rmtree(TEST_MONITOR_BASE) + rmtree(TEST_HANDLER_BASE) + rmtree(TEST_JOB_OUTPUT) + + def testLocalPythonConductorCreation(self)->None: + lpc = LocalPythonConductor() + + valid_jobs = lpc.valid_job_types() + + self.assertEqual(valid_jobs, [PYTHON_TYPE]) + + def testLocalPythonConductorValidJob(self)->None: + lpc = LocalPythonConductor() + + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") + + with open(file_path, "w") as f: + f.write("Data") + + file_hash = get_file_hash(file_path, SHA256) + + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":result_path + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + job_dict = create_job( + PYTHON_TYPE, + create_event( + WATCHDOG_TYPE, + file_path, + { + WATCHDOG_BASE: TEST_MONITOR_BASE, + WATCHDOG_RULE: rule, + WATCHDOG_HASH: file_hash + } + ), + { + JOB_PARAMETERS:{ + "extra":"extra", + "infile":file_path, + "outfile":result_path + }, + JOB_HASH: file_hash, + PYTHON_FUNC:job_func, + PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT, + PYTHON_EXECUTION_BASE:TEST_HANDLER_BASE + } + ) + + lpc.execute(job_dict) + + job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID]) + self.assertFalse(os.path.exists(job_dir)) + + output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + self.assertTrue(os.path.exists(output_dir)) + self.assertTrue(os.path.exists(os.path.join(output_dir, META_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, BASE_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, PARAMS_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, JOB_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, RESULT_FILE))) + + self.assertTrue(os.path.exists(result_path)) + + def testLocalPythonConductorBadArgs(self)->None: + lpc = LocalPythonConductor() + + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") + + with open(file_path, "w") as f: + f.write("Data") + + file_hash = get_file_hash(file_path, SHA256) + + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":result_path + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + bad_job_dict = create_job( + PYTHON_TYPE, + create_event( + WATCHDOG_TYPE, + file_path, + { + WATCHDOG_BASE: TEST_MONITOR_BASE, + WATCHDOG_RULE: rule, + WATCHDOG_HASH: file_hash + } + ), + { + JOB_PARAMETERS:{ + "extra":"extra", + "infile":file_path, + "outfile":result_path + }, + JOB_HASH: file_hash, + PYTHON_FUNC:job_func, + } + ) + + with self.assertRaises(KeyError): + lpc.execute(bad_job_dict) + + # Ensure execution can continue after one failed job + good_job_dict = create_job( + PYTHON_TYPE, + create_event( + WATCHDOG_TYPE, + file_path, + { + WATCHDOG_BASE: TEST_MONITOR_BASE, + WATCHDOG_RULE: rule, + WATCHDOG_HASH: file_hash + } + ), + { + JOB_PARAMETERS:{ + "extra":"extra", + "infile":file_path, + "outfile":result_path + }, + JOB_HASH: file_hash, + PYTHON_FUNC:job_func, + PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT, + PYTHON_EXECUTION_BASE:TEST_HANDLER_BASE + } + ) + + lpc.execute(good_job_dict) + + job_dir = os.path.join(TEST_HANDLER_BASE, good_job_dict[JOB_ID]) + self.assertFalse(os.path.exists(job_dir)) + + output_dir = os.path.join(TEST_JOB_OUTPUT, good_job_dict[JOB_ID]) + self.assertTrue(os.path.exists(output_dir)) + self.assertTrue(os.path.exists(os.path.join(output_dir, META_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, BASE_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, PARAMS_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, JOB_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, RESULT_FILE))) + + self.assertTrue(os.path.exists(result_path)) + + def testLocalPythonConductorBadFunc(self)->None: + lpc = LocalPythonConductor() + + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") + + with open(file_path, "w") as f: + f.write("Data") + + file_hash = get_file_hash(file_path, SHA256) + + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":result_path + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + job_dict = create_job( + PYTHON_TYPE, + create_event( + WATCHDOG_TYPE, + file_path, + { + WATCHDOG_BASE: TEST_MONITOR_BASE, + WATCHDOG_RULE: rule, + WATCHDOG_HASH: file_hash + } + ), + { + JOB_PARAMETERS:{ + "extra":"extra", + "infile":file_path, + "outfile":result_path + }, + JOB_HASH: file_hash, + PYTHON_FUNC:failing_func, + } + ) + + with self.assertRaises(Exception): + lpc.execute(job_dict) diff --git a/tests/test_functionality.py b/tests/test_functionality.py index eddaa08..94ec5c6 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -1,22 +1,35 @@ +import json import unittest import os +from datetime import datetime from multiprocessing import Pipe, Queue from time import sleep from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ - SHA256, TEST_MONITOR_BASE, COMPLETE_NOTEBOOK, EVENT_TYPE, EVENT_PATH + SHA256, TEST_MONITOR_BASE, COMPLETE_NOTEBOOK, EVENT_TYPE, EVENT_PATH, \ + WATCHDOG_TYPE, PYTHON_TYPE, WATCHDOG_BASE, WATCHDOG_HASH, WATCHDOG_RULE, \ + JOB_PARAMETERS, JOB_HASH, PYTHON_FUNC, PYTHON_OUTPUT_DIR, \ + PYTHON_EXECUTION_BASE, APPENDING_NOTEBOOK, JOB_ID, JOB_EVENT, JOB_TYPE, \ + JOB_PATTERN, JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, \ + JOB_REQUIREMENTS, STATUS_QUEUED from core.functionality import generate_id, wait, get_file_hash, rmtree, \ - make_dir, parameterize_jupyter_notebook, create_event - + make_dir, parameterize_jupyter_notebook, create_event, create_job, \ + replace_keywords, write_yaml, write_notebook, read_yaml, read_notebook, \ + KEYWORD_PATH, KEYWORD_REL_PATH, KEYWORD_DIR, KEYWORD_REL_DIR, \ + KEYWORD_FILENAME, KEYWORD_PREFIX, KEYWORD_BASE, KEYWORD_EXTENSION, \ + KEYWORD_JOB +from core.meow import create_rule +from patterns import FileEventPattern +from recipes import JupyterNotebookRecipe class CorrectnessTests(unittest.TestCase): - def setUp(self) -> None: + def setUp(self)->None: super().setUp() make_dir(TEST_MONITOR_BASE, ensure_clean=True) - def tearDown(self) -> None: + def tearDown(self)->None: super().tearDown() rmtree(TEST_MONITOR_BASE) @@ -236,3 +249,280 @@ class CorrectnessTests(unittest.TestCase): self.assertEqual(event2[EVENT_PATH], "path2") self.assertEqual(event2["a"], 1) + def testCreateJob(self)->None: + pattern = FileEventPattern( + "pattern", + "file_path", + "recipe_one", + "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":"result_path" + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + event = create_event( + WATCHDOG_TYPE, + "file_path", + { + WATCHDOG_BASE: TEST_MONITOR_BASE, + WATCHDOG_RULE: rule, + WATCHDOG_HASH: "file_hash" + } + ) + + job_dict = create_job( + PYTHON_TYPE, + event, + { + JOB_PARAMETERS:{ + "extra":"extra", + "infile":"file_path", + "outfile":"result_path" + }, + JOB_HASH: "file_hash", + PYTHON_FUNC:max, + PYTHON_OUTPUT_DIR:"output", + PYTHON_EXECUTION_BASE:"execution" + } + ) + + self.assertIsInstance(job_dict, dict) + self.assertIn(JOB_ID, job_dict) + self.assertIsInstance(job_dict[JOB_ID], str) + self.assertIn(JOB_EVENT, job_dict) + self.assertEqual(job_dict[JOB_EVENT], event) + self.assertIn(JOB_TYPE, job_dict) + self.assertEqual(job_dict[JOB_TYPE], PYTHON_TYPE) + self.assertIn(JOB_PATTERN, job_dict) + self.assertEqual(job_dict[JOB_PATTERN], pattern) + self.assertIn(JOB_RECIPE, job_dict) + self.assertEqual(job_dict[JOB_RECIPE], recipe) + self.assertIn(JOB_RULE, job_dict) + self.assertEqual(job_dict[JOB_RULE], rule.name) + self.assertIn(JOB_STATUS, job_dict) + self.assertEqual(job_dict[JOB_STATUS], STATUS_QUEUED) + self.assertIn(JOB_CREATE_TIME, job_dict) + self.assertIsInstance(job_dict[JOB_CREATE_TIME], datetime) + self.assertIn(JOB_REQUIREMENTS, job_dict) + self.assertEqual(job_dict[JOB_REQUIREMENTS], {}) + + def testReplaceKeywords(self)->None: + test_dict = { + "A": f"--{KEYWORD_PATH}--", + "B": f"--{KEYWORD_REL_PATH}--", + "C": f"--{KEYWORD_DIR}--", + "D": f"--{KEYWORD_REL_DIR}--", + "E": f"--{KEYWORD_FILENAME}--", + "F": f"--{KEYWORD_PREFIX}--", + "G": f"--{KEYWORD_BASE}--", + "H": f"--{KEYWORD_EXTENSION}--", + "I": f"--{KEYWORD_JOB}--", + "J": f"--{KEYWORD_PATH}-{KEYWORD_PATH}--", + "K": f"{KEYWORD_PATH}", + "L": f"--{KEYWORD_PATH}-{KEYWORD_REL_PATH}-{KEYWORD_DIR}-" + f"{KEYWORD_REL_DIR}-{KEYWORD_FILENAME}-{KEYWORD_PREFIX}-" + f"{KEYWORD_BASE}-{KEYWORD_EXTENSION}-{KEYWORD_JOB}--", + "M": "A", + "N": 1 + } + + print(test_dict["A"]) + + replaced = replace_keywords( + test_dict, "job_id", "base/src/dir/file.ext", "base/monitor/dir") + + self.assertIsInstance(replaced, dict) + self.assertEqual(len(test_dict.keys()), len(replaced.keys())) + for k in test_dict.keys(): + self.assertIn(k, replaced) + + self.assertEqual(replaced["A"], "--base/src/dir/file.ext--") + self.assertEqual(replaced["B"], "--../../src/dir/file.ext--") + self.assertEqual(replaced["C"], "--base/src/dir--") + self.assertEqual(replaced["D"], "--../../src/dir--") + self.assertEqual(replaced["E"], "--file.ext--") + self.assertEqual(replaced["F"], "--file--") + self.assertEqual(replaced["G"], "--base/monitor/dir--") + self.assertEqual(replaced["H"], "--.ext--") + self.assertEqual(replaced["I"], "--job_id--") + self.assertEqual(replaced["J"], + "--base/src/dir/file.ext-base/src/dir/file.ext--") + self.assertEqual(replaced["K"], "base/src/dir/file.ext") + self.assertEqual(replaced["L"], + "--base/src/dir/file.ext-../../src/dir/file.ext-base/src/dir-" + "../../src/dir-file.ext-file-base/monitor/dir-.ext-job_id--") + self.assertEqual(replaced["M"], "A") + self.assertEqual(replaced["N"], 1) + + def testWriteNotebook(self)->None: + notebook_path = os.path.join(TEST_MONITOR_BASE, "test_notebook.ipynb") + self.assertFalse(os.path.exists(notebook_path)) + write_notebook(APPENDING_NOTEBOOK, notebook_path) + self.assertTrue(os.path.exists(notebook_path)) + + with open(notebook_path, 'r') as f: + data = f.readlines() + + print(data) + expected_bytes = [ + '{"cells": [{"cell_type": "code", "execution_count": null, ' + '"metadata": {}, "outputs": [], "source": ["# Default parameters ' + 'values\\n", "# The line to append\\n", "extra = \'This line ' + 'comes from a default pattern\'\\n", "# Data input file ' + 'location\\n", "infile = \'start/alpha.txt\'\\n", "# Output file ' + 'location\\n", "outfile = \'first/alpha.txt\'"]}, {"cell_type": ' + '"code", "execution_count": null, "metadata": {}, "outputs": [], ' + '"source": ["# load in dataset. This should be a text file\\n", ' + '"with open(infile) as input_file:\\n", " data = ' + 'input_file.read()"]}, {"cell_type": "code", "execution_count": ' + 'null, "metadata": {}, "outputs": [], "source": ["# Append the ' + 'line\\n", "appended = data + \'\\\\n\' + extra"]}, {"cell_type": ' + '"code", "execution_count": null, "metadata": {}, "outputs": [], ' + '"source": ["import os\\n", "\\n", "# Create output directory if ' + 'it doesn\'t exist\\n", "output_dir_path = ' + 'os.path.dirname(outfile)\\n", "\\n", "if output_dir_path:\\n", ' + '" os.makedirs(output_dir_path, exist_ok=True)\\n", "\\n", "# ' + 'Save added array as new dataset\\n", "with open(outfile, \'w\') ' + 'as output_file:\\n", " output_file.write(appended)"]}], ' + '"metadata": {"kernelspec": {"display_name": "Python 3", ' + '"language": "python", "name": "python3"}, "language_info": ' + '{"codemirror_mode": {"name": "ipython", "version": 3}, ' + '"file_extension": ".py", "mimetype": "text/x-python", "name": ' + '"python", "nbconvert_exporter": "python", "pygments_lexer": ' + '"ipython3", "version": "3.10.6 (main, Nov 14 2022, 16:10:14) ' + '[GCC 11.3.0]"}, "vscode": {"interpreter": {"hash": ' + '"916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1' + '"}}}, "nbformat": 4, "nbformat_minor": 4}' + ] + + self.assertEqual(data, expected_bytes) + + def testReadNotebook(self)->None: + notebook_path = os.path.join(TEST_MONITOR_BASE, "test_notebook.ipynb") + write_notebook(APPENDING_NOTEBOOK, notebook_path) + + notebook = read_notebook(notebook_path) + self.assertEqual(notebook, APPENDING_NOTEBOOK) + + + with self.assertRaises(FileNotFoundError): + read_notebook("doesNotExist.ipynb") + + filepath = os.path.join(TEST_MONITOR_BASE, "T.txt") + with open(filepath, "w") as f: + f.write("Data") + + with self.assertRaises(ValueError): + read_notebook(filepath) + + filepath = os.path.join(TEST_MONITOR_BASE, "T.ipynb") + with open(filepath, "w") as f: + f.write("Data") + + with self.assertRaises(json.decoder.JSONDecodeError): + read_notebook(filepath) + + def testWriteYaml(self)->None: + yaml_dict = { + "A": "a", + "B": 1, + "C": { + "D": True, + "E": [ + 1, 2, 3 + ] + } + } + + filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml") + + self.assertFalse(os.path.exists(filepath)) + write_yaml(yaml_dict, filepath) + self.assertTrue(os.path.exists(filepath)) + + with open(filepath, 'r') as f: + data = f.readlines() + + expected_bytes = [ + 'A: a\n', + 'B: 1\n', + 'C:\n', + ' D: true\n', + ' E:\n', + ' - 1\n', + ' - 2\n', + ' - 3\n' + ] + + self.assertEqual(data, expected_bytes) + + def testReadYaml(self)->None: + yaml_dict = { + "A": "a", + "B": 1, + "C": { + "D": True, + "E": [ + 1, 2, 3 + ] + } + } + + filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml") + write_yaml(yaml_dict, filepath) + + read_dict = read_yaml(filepath) + self.assertEqual(yaml_dict, read_dict) + + with self.assertRaises(FileNotFoundError): + read_yaml("doesNotExist") + + filepath = os.path.join(TEST_MONITOR_BASE, "T.txt") + with open(filepath, "w") as f: + f.write("Data") + + data = read_yaml(filepath) + self.assertEqual(data, "Data") + + def testMakeDir(self)->None: + testDir = os.path.join(TEST_MONITOR_BASE, "Test") + self.assertFalse(os.path.exists(testDir)) + make_dir(testDir) + self.assertTrue(os.path.exists(testDir)) + self.assertTrue(os.path.isdir(testDir)) + + nested = os.path.join(TEST_MONITOR_BASE, "A", "B", "C", "D") + self.assertFalse(os.path.exists(os.path.join(TEST_MONITOR_BASE, "A"))) + make_dir(nested) + self.assertTrue(os.path.exists(nested)) + + with self.assertRaises(FileExistsError): + make_dir(nested, can_exist=False) + + filepath = os.path.join(TEST_MONITOR_BASE, "T.txt") + with open(filepath, "w") as f: + f.write("Data") + + with self.assertRaises(ValueError): + make_dir(filepath) + + halfway = os.path.join(TEST_MONITOR_BASE, "A", "B") + make_dir(halfway, ensure_clean=True) + self.assertTrue(os.path.exists(halfway)) + self.assertEqual(len(os.listdir(halfway)), 0) + + def testRemoveTree(self)->None: + nested = os.path.join(TEST_MONITOR_BASE, "A", "B") + self.assertFalse(os.path.exists(os.path.join(TEST_MONITOR_BASE, "A"))) + make_dir(nested) + self.assertTrue(os.path.exists(nested)) + + rmtree(os.path.join(TEST_MONITOR_BASE, "A")) + self.assertTrue(os.path.exists(TEST_MONITOR_BASE)) + self.assertFalse(os.path.exists(os.path.join(TEST_MONITOR_BASE, "A"))) + self.assertFalse(os.path.exists( + os.path.join(TEST_MONITOR_BASE, "A", "B"))) diff --git a/tests/test_meow.py b/tests/test_meow.py index 9711f6f..1b323ab 100644 --- a/tests/test_meow.py +++ b/tests/test_meow.py @@ -1,18 +1,14 @@ -import io -import os import unittest -from multiprocessing import Pipe from typing import Any, Union from core.correctness.vars import TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \ - TEST_MONITOR_BASE, BAREBONES_NOTEBOOK, WATCHDOG_BASE, WATCHDOG_RULE, \ - EVENT_PATH, WATCHDOG_TYPE, EVENT_TYPE + TEST_MONITOR_BASE, BAREBONES_NOTEBOOK from core.functionality import make_dir, rmtree from core.meow import BasePattern, BaseRecipe, BaseRule, BaseMonitor, \ - BaseHandler, create_rules -from patterns import FileEventPattern, WatchdogMonitor + BaseHandler, BaseConductor, create_rules, create_rule +from patterns import FileEventPattern from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe valid_pattern_one = FileEventPattern( @@ -27,13 +23,13 @@ valid_recipe_two = JupyterNotebookRecipe( class MeowTests(unittest.TestCase): - def setUp(self) -> None: + def setUp(self)->None: super().setUp() make_dir(TEST_MONITOR_BASE) make_dir(TEST_HANDLER_BASE) make_dir(TEST_JOB_OUTPUT) - def tearDown(self) -> None: + def tearDown(self)->None: super().tearDown() rmtree(TEST_MONITOR_BASE) rmtree(TEST_HANDLER_BASE) @@ -93,8 +89,18 @@ class MeowTests(unittest.TestCase): pass FullRule("name", "", "") + def testCreateRule(self)->None: + rule = create_rule(valid_pattern_one, valid_recipe_one) + + self.assertIsInstance(rule, BaseRule) + + with self.assertRaises(ValueError): + rule = create_rule(valid_pattern_one, valid_recipe_two) + def testCreateRulesMinimum(self)->None: - create_rules({}, {}) + rules = create_rules({}, {}) + + self.assertEqual(len(rules), 0) def testCreateRulesPatternsAndRecipesDicts(self)->None: patterns = { @@ -166,135 +172,9 @@ class MeowTests(unittest.TestCase): pass def get_rules(self)->None: pass + FullTestMonitor({}, {}) - def testMonitoring(self)->None: - pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", - parameters={}) - recipe = JupyterNotebookRecipe( - "recipe_one", BAREBONES_NOTEBOOK) - - patterns = { - pattern_one.name: pattern_one, - } - recipes = { - recipe.name: recipe, - } - - monitor_debug_stream = io.StringIO("") - - wm = WatchdogMonitor( - TEST_MONITOR_BASE, - patterns, - recipes, - print=monitor_debug_stream, - logging=3, - settletime=1 - ) - - rules = wm.get_rules() - rule = rules[list(rules.keys())[0]] - - from_monitor_reader, from_monitor_writer = Pipe() - wm.to_runner = from_monitor_writer - - wm.start() - - start_dir = os.path.join(TEST_MONITOR_BASE, "start") - make_dir(start_dir) - self.assertTrue(start_dir) - with open(os.path.join(start_dir, "A.txt"), "w") as f: - f.write("Initial Data") - - self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) - - messages = [] - while True: - if from_monitor_reader.poll(3): - messages.append(from_monitor_reader.recv()) - else: - break - self.assertTrue(len(messages), 1) - message = messages[0] - - self.assertEqual(type(message), dict) - self.assertIn(EVENT_TYPE, message) - self.assertEqual(message[EVENT_TYPE], WATCHDOG_TYPE) - self.assertIn(WATCHDOG_BASE, message) - self.assertEqual(message[WATCHDOG_BASE], TEST_MONITOR_BASE) - self.assertIn(EVENT_PATH, message) - self.assertEqual(message[EVENT_PATH], - os.path.join(start_dir, "A.txt")) - self.assertIn(WATCHDOG_RULE, message) - self.assertEqual(message[WATCHDOG_RULE].name, rule.name) - - wm.stop() - - def testMonitoringRetroActive(self)->None: - pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", - parameters={}) - recipe = JupyterNotebookRecipe( - "recipe_one", BAREBONES_NOTEBOOK) - - patterns = { - pattern_one.name: pattern_one, - } - recipes = { - recipe.name: recipe, - } - - start_dir = os.path.join(TEST_MONITOR_BASE, "start") - make_dir(start_dir) - self.assertTrue(start_dir) - with open(os.path.join(start_dir, "A.txt"), "w") as f: - f.write("Initial Data") - - self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) - - monitor_debug_stream = io.StringIO("") - - wm = WatchdogMonitor( - TEST_MONITOR_BASE, - patterns, - recipes, - print=monitor_debug_stream, - logging=3, - settletime=1 - ) - - rules = wm.get_rules() - rule = rules[list(rules.keys())[0]] - - from_monitor_reader, from_monitor_writer = Pipe() - wm.to_runner = from_monitor_writer - - wm.start() - - messages = [] - while True: - if from_monitor_reader.poll(3): - messages.append(from_monitor_reader.recv()) - else: - break - self.assertTrue(len(messages), 1) - message = messages[0] - - self.assertEqual(type(message), dict) - self.assertIn(EVENT_TYPE, message) - self.assertEqual(message[EVENT_TYPE], WATCHDOG_TYPE) - self.assertIn(WATCHDOG_BASE, message) - self.assertEqual(message[WATCHDOG_BASE], TEST_MONITOR_BASE) - self.assertIn(EVENT_PATH, message) - self.assertEqual(message[EVENT_PATH], - os.path.join(start_dir, "A.txt")) - self.assertIn(WATCHDOG_RULE, message) - self.assertEqual(message[WATCHDOG_RULE].name, rule.name) - - wm.stop() - - def testBaseHandler(self)->None: with self.assertRaises(TypeError): BaseHandler() @@ -316,5 +196,24 @@ class MeowTests(unittest.TestCase): pass def valid_event_types(self)->list[str]: pass + FullTestHandler() + def testBaseConductor(self)->None: + with self.assertRaises(NotImplementedError): + BaseConductor() + + class TestConductor(BaseConductor): + pass + + with self.assertRaises(NotImplementedError): + TestConductor() + + class FullTestConductor(BaseConductor): + def execute(self, job:dict[str,Any])->None: + pass + + def valid_job_types(self)->list[str]: + pass + + FullTestConductor() diff --git a/tests/test_patterns.py b/tests/test_patterns.py index 734ab6a..019d425 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -1,4 +1,5 @@ +import io import os import unittest @@ -8,17 +9,38 @@ from core.correctness.vars import FILE_CREATE_EVENT, BAREBONES_NOTEBOOK, \ TEST_MONITOR_BASE, EVENT_TYPE, WATCHDOG_RULE, WATCHDOG_BASE, \ WATCHDOG_TYPE, EVENT_PATH from core.functionality import rmtree, make_dir -from core.meow import create_rules from patterns.file_event_pattern import FileEventPattern, WatchdogMonitor, \ _DEFAULT_MASK, SWEEP_START, SWEEP_STOP, SWEEP_JUMP from recipes import JupyterNotebookRecipe + +def patterns_equal(tester, pattern_one, pattern_two): + tester.assertEqual(pattern_one.name, pattern_two.name) + tester.assertEqual(pattern_one.recipe, pattern_two.recipe) + tester.assertEqual(pattern_one.parameters, pattern_two.parameters) + tester.assertEqual(pattern_one.outputs, pattern_two.outputs) + tester.assertEqual(pattern_one.triggering_path, + pattern_two.triggering_path) + tester.assertEqual(pattern_one.triggering_file, + pattern_two.triggering_file) + tester.assertEqual(pattern_one.event_mask, pattern_two.event_mask) + tester.assertEqual(pattern_one.sweep, pattern_two.sweep) + + +def recipes_equal(tester, recipe_one, recipe_two): + tester.assertEqual(recipe_one.name, recipe_two.name) + tester.assertEqual(recipe_one.recipe, recipe_two.recipe) + tester.assertEqual(recipe_one.parameters, recipe_two.parameters) + tester.assertEqual(recipe_one.requirements, recipe_two.requirements) + tester.assertEqual(recipe_one.source, recipe_two.source) + + class CorrectnessTests(unittest.TestCase): - def setUp(self) -> None: + def setUp(self)->None: super().setUp() make_dir(TEST_MONITOR_BASE, ensure_clean=True) - def tearDown(self) -> None: + def tearDown(self)->None: super().tearDown() rmtree(TEST_MONITOR_BASE) @@ -199,3 +221,493 @@ class CorrectnessTests(unittest.TestCase): self.assertIsNone(new_message) wm.stop() + + def testMonitoring(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={}) + recipe = JupyterNotebookRecipe( + "recipe_one", BAREBONES_NOTEBOOK) + + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, + } + + monitor_debug_stream = io.StringIO("") + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + patterns, + recipes, + print=monitor_debug_stream, + logging=3, + settletime=1 + ) + + rules = wm.get_rules() + rule = rules[list(rules.keys())[0]] + + from_monitor_reader, from_monitor_writer = Pipe() + wm.to_runner = from_monitor_writer + + wm.start() + + start_dir = os.path.join(TEST_MONITOR_BASE, "start") + make_dir(start_dir) + self.assertTrue(start_dir) + with open(os.path.join(start_dir, "A.txt"), "w") as f: + f.write("Initial Data") + + self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) + + messages = [] + while True: + if from_monitor_reader.poll(3): + messages.append(from_monitor_reader.recv()) + else: + break + self.assertTrue(len(messages), 1) + message = messages[0] + + self.assertEqual(type(message), dict) + self.assertIn(EVENT_TYPE, message) + self.assertEqual(message[EVENT_TYPE], WATCHDOG_TYPE) + self.assertIn(WATCHDOG_BASE, message) + self.assertEqual(message[WATCHDOG_BASE], TEST_MONITOR_BASE) + self.assertIn(EVENT_PATH, message) + self.assertEqual(message[EVENT_PATH], + os.path.join(start_dir, "A.txt")) + self.assertIn(WATCHDOG_RULE, message) + self.assertEqual(message[WATCHDOG_RULE].name, rule.name) + + wm.stop() + + def testMonitoringRetroActive(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={}) + recipe = JupyterNotebookRecipe( + "recipe_one", BAREBONES_NOTEBOOK) + + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, + } + + start_dir = os.path.join(TEST_MONITOR_BASE, "start") + make_dir(start_dir) + self.assertTrue(start_dir) + with open(os.path.join(start_dir, "A.txt"), "w") as f: + f.write("Initial Data") + + self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt"))) + + monitor_debug_stream = io.StringIO("") + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + patterns, + recipes, + print=monitor_debug_stream, + logging=3, + settletime=1 + ) + + rules = wm.get_rules() + rule = rules[list(rules.keys())[0]] + + from_monitor_reader, from_monitor_writer = Pipe() + wm.to_runner = from_monitor_writer + + wm.start() + + messages = [] + while True: + if from_monitor_reader.poll(3): + messages.append(from_monitor_reader.recv()) + else: + break + self.assertTrue(len(messages), 1) + message = messages[0] + + self.assertEqual(type(message), dict) + self.assertIn(EVENT_TYPE, message) + self.assertEqual(message[EVENT_TYPE], WATCHDOG_TYPE) + self.assertIn(WATCHDOG_BASE, message) + self.assertEqual(message[WATCHDOG_BASE], TEST_MONITOR_BASE) + self.assertIn(EVENT_PATH, message) + self.assertEqual(message[EVENT_PATH], + os.path.join(start_dir, "A.txt")) + self.assertIn(WATCHDOG_RULE, message) + self.assertEqual(message[WATCHDOG_RULE].name, rule.name) + + wm.stop() + + def testMonitorGetPatterns(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={}) + pattern_two = FileEventPattern( + "pattern_two", "start/B.txt", "recipe_two", "infile", + parameters={}) + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + { + pattern_one.name: pattern_one, + pattern_two.name: pattern_two + }, + {} + ) + + patterns = wm.get_patterns() + + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 2) + self.assertIn(pattern_one.name, patterns) + patterns_equal(self, patterns[pattern_one.name], pattern_one) + self.assertIn(pattern_two.name, patterns) + patterns_equal(self, patterns[pattern_two.name], pattern_two) + + def testMonitorAddPattern(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={}) + pattern_two = FileEventPattern( + "pattern_two", "start/B.txt", "recipe_two", "infile", + parameters={}) + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + {pattern_one.name: pattern_one}, + {} + ) + + patterns = wm.get_patterns() + + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 1) + self.assertIn(pattern_one.name, patterns) + patterns_equal(self, patterns[pattern_one.name], pattern_one) + + wm.add_pattern(pattern_two) + + patterns = wm.get_patterns() + + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 2) + self.assertIn(pattern_one.name, patterns) + patterns_equal(self, patterns[pattern_one.name], pattern_one) + self.assertIn(pattern_two.name, patterns) + patterns_equal(self, patterns[pattern_two.name], pattern_two) + + with self.assertRaises(KeyError): + wm.add_pattern(pattern_two) + + patterns = wm.get_patterns() + + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 2) + self.assertIn(pattern_one.name, patterns) + patterns_equal(self, patterns[pattern_one.name], pattern_one) + self.assertIn(pattern_two.name, patterns) + patterns_equal(self, patterns[pattern_two.name], pattern_two) + + def testMonitorUpdatePattern(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={}) + pattern_two = FileEventPattern( + "pattern_two", "start/B.txt", "recipe_two", "infile", + parameters={}) + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + {pattern_one.name: pattern_one}, + {} + ) + + patterns = wm.get_patterns() + + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 1) + self.assertIn(pattern_one.name, patterns) + patterns_equal(self, patterns[pattern_one.name], pattern_one) + + pattern_one.recipe = "top_secret_recipe" + + patterns = wm.get_patterns() + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 1) + self.assertIn(pattern_one.name, patterns) + self.assertEqual(patterns[pattern_one.name].name, + pattern_one.name) + self.assertEqual(patterns[pattern_one.name].recipe, + "recipe_one") + self.assertEqual(patterns[pattern_one.name].parameters, + pattern_one.parameters) + self.assertEqual(patterns[pattern_one.name].outputs, + pattern_one.outputs) + self.assertEqual(patterns[pattern_one.name].triggering_path, + pattern_one.triggering_path) + self.assertEqual(patterns[pattern_one.name].triggering_file, + pattern_one.triggering_file) + self.assertEqual(patterns[pattern_one.name].event_mask, + pattern_one.event_mask) + self.assertEqual(patterns[pattern_one.name].sweep, + pattern_one.sweep) + + wm.update_pattern(pattern_one) + + patterns = wm.get_patterns() + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 1) + self.assertIn(pattern_one.name, patterns) + patterns_equal(self, patterns[pattern_one.name], pattern_one) + + with self.assertRaises(KeyError): + wm.update_pattern(pattern_two) + + patterns = wm.get_patterns() + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 1) + self.assertIn(pattern_one.name, patterns) + patterns_equal(self, patterns[pattern_one.name], pattern_one) + + def testMonitorRemovePattern(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={}) + pattern_two = FileEventPattern( + "pattern_two", "start/B.txt", "recipe_two", "infile", + parameters={}) + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + {pattern_one.name: pattern_one}, + {} + ) + + patterns = wm.get_patterns() + + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 1) + self.assertIn(pattern_one.name, patterns) + patterns_equal(self, patterns[pattern_one.name], pattern_one) + + with self.assertRaises(KeyError): + wm.remove_pattern(pattern_two) + + patterns = wm.get_patterns() + + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 1) + self.assertIn(pattern_one.name, patterns) + patterns_equal(self, patterns[pattern_one.name], pattern_one) + + wm.remove_pattern(pattern_one) + + patterns = wm.get_patterns() + + self.assertIsInstance(patterns, dict) + self.assertEqual(len(patterns), 0) + + def testMonitorGetRecipes(self)->None: + recipe_one = JupyterNotebookRecipe( + "recipe_one", BAREBONES_NOTEBOOK) + recipe_two = JupyterNotebookRecipe( + "recipe_two", BAREBONES_NOTEBOOK) + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + {}, + { + recipe_one.name: recipe_one, + recipe_two.name: recipe_two + } + ) + + recipes = wm.get_recipes() + + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 2) + self.assertIn(recipe_one.name, recipes) + recipes_equal(self, recipes[recipe_one.name], recipe_one) + self.assertIn(recipe_two.name, recipes) + recipes_equal(self, recipes[recipe_two.name], recipe_two) + + def testMonitorAddRecipe(self)->None: + recipe_one = JupyterNotebookRecipe( + "recipe_one", BAREBONES_NOTEBOOK) + recipe_two = JupyterNotebookRecipe( + "recipe_two", BAREBONES_NOTEBOOK) + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + {}, + { + recipe_one.name: recipe_one + } + ) + + recipes = wm.get_recipes() + + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 1) + self.assertIn(recipe_one.name, recipes) + recipes_equal(self, recipes[recipe_one.name], recipe_one) + + + wm.add_recipe(recipe_two) + + recipes = wm.get_recipes() + + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 2) + self.assertIn(recipe_one.name, recipes) + recipes_equal(self, recipes[recipe_one.name], recipe_one) + self.assertIn(recipe_two.name, recipes) + recipes_equal(self, recipes[recipe_two.name], recipe_two) + + with self.assertRaises(KeyError): + wm.add_recipe(recipe_two) + + recipes = wm.get_recipes() + + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 2) + self.assertIn(recipe_one.name, recipes) + recipes_equal(self, recipes[recipe_one.name], recipe_one) + self.assertIn(recipe_two.name, recipes) + recipes_equal(self, recipes[recipe_two.name], recipe_two) + + def testMonitorUpdateRecipe(self)->None: + recipe_one = JupyterNotebookRecipe( + "recipe_one", BAREBONES_NOTEBOOK) + recipe_two = JupyterNotebookRecipe( + "recipe_two", BAREBONES_NOTEBOOK) + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + {}, + { + recipe_one.name: recipe_one + } + ) + + recipes = wm.get_recipes() + + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 1) + self.assertIn(recipe_one.name, recipes) + recipes_equal(self, recipes[recipe_one.name], recipe_one) + + recipe_one.source = "top_secret_source" + + recipes = wm.get_recipes() + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 1) + self.assertIn(recipe_one.name, recipes) + self.assertEqual(recipes[recipe_one.name].name, + recipe_one.name) + self.assertEqual(recipes[recipe_one.name].recipe, + recipe_one.recipe) + self.assertEqual(recipes[recipe_one.name].parameters, + recipe_one.parameters) + self.assertEqual(recipes[recipe_one.name].requirements, + recipe_one.requirements) + self.assertEqual(recipes[recipe_one.name].source, + "") + + wm.update_recipe(recipe_one) + + recipes = wm.get_recipes() + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 1) + self.assertIn(recipe_one.name, recipes) + recipes_equal(self, recipes[recipe_one.name], recipe_one) + + with self.assertRaises(KeyError): + wm.update_recipe(recipe_two) + + recipes = wm.get_recipes() + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 1) + self.assertIn(recipe_one.name, recipes) + recipes_equal(self, recipes[recipe_one.name], recipe_one) + + def testMonitorRemoveRecipe(self)->None: + recipe_one = JupyterNotebookRecipe( + "recipe_one", BAREBONES_NOTEBOOK) + recipe_two = JupyterNotebookRecipe( + "recipe_two", BAREBONES_NOTEBOOK) + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + {}, + { + recipe_one.name: recipe_one + } + ) + + recipes = wm.get_recipes() + + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 1) + self.assertIn(recipe_one.name, recipes) + recipes_equal(self, recipes[recipe_one.name], recipe_one) + + with self.assertRaises(KeyError): + wm.remove_recipe(recipe_two) + + recipes = wm.get_recipes() + + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 1) + self.assertIn(recipe_one.name, recipes) + recipes_equal(self, recipes[recipe_one.name], recipe_one) + + wm.remove_recipe(recipe_one) + + recipes = wm.get_recipes() + + self.assertIsInstance(recipes, dict) + self.assertEqual(len(recipes), 0) + + def testMonitorGetRules(self)->None: + pattern_one = FileEventPattern( + "pattern_one", "start/A.txt", "recipe_one", "infile", + parameters={}) + pattern_two = FileEventPattern( + "pattern_two", "start/B.txt", "recipe_two", "infile", + parameters={}) + recipe_one = JupyterNotebookRecipe( + "recipe_one", BAREBONES_NOTEBOOK) + recipe_two = JupyterNotebookRecipe( + "recipe_two", BAREBONES_NOTEBOOK) + + patterns = { + pattern_one.name: pattern_one, + pattern_two.name: pattern_two, + } + recipes = { + recipe_one.name: recipe_one, + recipe_two.name: recipe_two, + } + + wm = WatchdogMonitor( + TEST_MONITOR_BASE, + patterns, + recipes + ) + + rules = wm.get_rules() + + self.assertIsInstance(rules, dict) + self.assertEqual(len(rules), 2) + \ No newline at end of file diff --git a/tests/test_recipes.py b/tests/test_recipes.py index 8decfdc..42f51c2 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -1,29 +1,34 @@ -import io import jsonschema import os import unittest -from time import sleep +from multiprocessing import Pipe -from patterns.file_event_pattern import FileEventPattern -from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ - PapermillHandler, BASE_FILE, META_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE -from rules.file_event_jupyter_notebook_rule import FileEventJupyterNotebookRule from core.correctness.vars import BAREBONES_NOTEBOOK, TEST_HANDLER_BASE, \ TEST_JOB_OUTPUT, TEST_MONITOR_BASE, COMPLETE_NOTEBOOK, EVENT_TYPE, \ - WATCHDOG_BASE, WATCHDOG_RULE, WATCHDOG_TYPE, EVENT_PATH -from core.functionality import rmtree, make_dir, read_notebook -from core.meow import create_rules + WATCHDOG_BASE, WATCHDOG_RULE, WATCHDOG_TYPE, EVENT_PATH, SHA256, \ + WATCHDOG_HASH, JOB_ID, PYTHON_TYPE, JOB_PARAMETERS, JOB_HASH, \ + PYTHON_FUNC, PYTHON_OUTPUT_DIR, PYTHON_EXECUTION_BASE, \ + APPENDING_NOTEBOOK, META_FILE, BASE_FILE, PARAMS_FILE, JOB_FILE, \ + RESULT_FILE +from core.correctness.validation import valid_job +from core.functionality import rmtree, make_dir, get_file_hash, create_job, \ + create_event +from core.meow import create_rules, create_rule +from patterns.file_event_pattern import FileEventPattern +from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ + PapermillHandler, job_func +from rules.file_event_jupyter_notebook_rule import FileEventJupyterNotebookRule class CorrectnessTests(unittest.TestCase): - def setUp(self) -> None: + def setUp(self)->None: super().setUp() make_dir(TEST_MONITOR_BASE) make_dir(TEST_HANDLER_BASE) make_dir(TEST_JOB_OUTPUT) - def tearDown(self) -> None: + def tearDown(self)->None: super().tearDown() rmtree(TEST_MONITOR_BASE) rmtree(TEST_HANDLER_BASE) @@ -98,14 +103,12 @@ class CorrectnessTests(unittest.TestCase): ) def testPapermillHandlerHandling(self)->None: - debug_stream = io.StringIO("") - + from_handler_reader, from_handler_writer = Pipe() ph = PapermillHandler( TEST_HANDLER_BASE, - TEST_JOB_OUTPUT, - print=debug_stream, - logging=3 + TEST_JOB_OUTPUT ) + ph.to_runner = from_handler_writer with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: f.write("Data") @@ -133,41 +136,88 @@ class CorrectnessTests(unittest.TestCase): EVENT_TYPE: WATCHDOG_TYPE, EVENT_PATH: os.path.join(TEST_MONITOR_BASE, "A"), WATCHDOG_BASE: TEST_MONITOR_BASE, - WATCHDOG_RULE: rule + WATCHDOG_RULE: rule, + WATCHDOG_HASH: get_file_hash( + os.path.join(TEST_MONITOR_BASE, "A"), SHA256 + ) } ph.handle(event) - loops = 0 - job_id = None - while loops < 15: - sleep(1) - debug_stream.seek(0) - messages = debug_stream.readlines() + if from_handler_reader.poll(3): + job = from_handler_reader.recv() - for msg in messages: - self.assertNotIn("ERROR", msg) - - if "INFO: Completed job " in msg: - job_id = msg.replace("INFO: Completed job ", "") - job_id = job_id[:job_id.index(" with output")] - loops = 15 - loops += 1 + self.assertIsNotNone(job[JOB_ID]) - self.assertIsNotNone(job_id) - self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1) - self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT)) + valid_job(job) - job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) - self.assertEqual(len(os.listdir(job_dir)), 5) + def testJobFunc(self)->None: + file_path = os.path.join(TEST_MONITOR_BASE, "test") + result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") - self.assertIn(META_FILE, os.listdir(job_dir)) - self.assertIn(BASE_FILE, os.listdir(job_dir)) - self.assertIn(PARAMS_FILE, os.listdir(job_dir)) - self.assertIn(JOB_FILE, os.listdir(job_dir)) - self.assertIn(RESULT_FILE, os.listdir(job_dir)) + with open(file_path, "w") as f: + f.write("Data") - result = read_notebook(os.path.join(job_dir, RESULT_FILE)) + file_hash = get_file_hash(file_path, SHA256) - self.assertEqual("124875.0\n", - result["cells"][4]["outputs"][0]["text"][0]) + pattern = FileEventPattern( + "pattern", + file_path, + "recipe_one", + "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":result_path + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + job_dict = create_job( + PYTHON_TYPE, + create_event( + WATCHDOG_TYPE, + file_path, + { + WATCHDOG_BASE: TEST_MONITOR_BASE, + WATCHDOG_RULE: rule, + WATCHDOG_HASH: file_hash + } + ), + { + JOB_PARAMETERS:{ + "extra":"extra", + "infile":file_path, + "outfile":result_path + }, + JOB_HASH: file_hash, + PYTHON_FUNC:job_func, + PYTHON_OUTPUT_DIR:TEST_JOB_OUTPUT, + PYTHON_EXECUTION_BASE:TEST_HANDLER_BASE + } + ) + + job_func(job_dict) + + job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID]) + self.assertFalse(os.path.exists(job_dir)) + + output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + self.assertTrue(os.path.exists(output_dir)) + self.assertTrue(os.path.exists(os.path.join(output_dir, META_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, BASE_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, PARAMS_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, JOB_FILE))) + self.assertTrue(os.path.exists(os.path.join(output_dir, RESULT_FILE))) + + self.assertTrue(os.path.exists(result_path)) + + def testJobFuncBadArgs(self)->None: + try: + job_func({}) + except Exception: + pass + + self.assertEqual(len(os.listdir(TEST_HANDLER_BASE)), 0) + self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) diff --git a/tests/test_rules.py b/tests/test_rules.py index 1edf0a4..d8eac25 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -7,10 +7,10 @@ from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe from rules.file_event_jupyter_notebook_rule import FileEventJupyterNotebookRule class CorrectnessTests(unittest.TestCase): - def setUp(self) -> None: + def setUp(self)->None: return super().setUp() - def tearDown(self) -> None: + def tearDown(self)->None: return super().tearDown() def testFileEventJupyterNotebookRuleCreationMinimum(self)->None: diff --git a/tests/test_runner.py b/tests/test_runner.py index 3327ab2..a0f4da7 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -5,30 +5,171 @@ import unittest from time import sleep +from conductors import LocalPythonConductor from core.correctness.vars import TEST_HANDLER_BASE, TEST_JOB_OUTPUT, \ - TEST_MONITOR_BASE, APPENDING_NOTEBOOK + TEST_MONITOR_BASE, APPENDING_NOTEBOOK, RESULT_FILE from core.functionality import make_dir, rmtree, read_notebook -from core.meow import create_rules +from core.meow import BaseMonitor, BaseHandler, BaseConductor from core.runner import MeowRunner from patterns import WatchdogMonitor, FileEventPattern from recipes.jupyter_notebook_recipe import PapermillHandler, \ - JupyterNotebookRecipe, RESULT_FILE + JupyterNotebookRecipe class MeowTests(unittest.TestCase): - def setUp(self) -> None: + def setUp(self)->None: super().setUp() make_dir(TEST_MONITOR_BASE) make_dir(TEST_HANDLER_BASE) make_dir(TEST_JOB_OUTPUT) - def tearDown(self) -> None: + def tearDown(self)->None: super().tearDown() rmtree(TEST_MONITOR_BASE) rmtree(TEST_HANDLER_BASE) rmtree(TEST_JOB_OUTPUT) - def testMeowRunner(self)->None: + def testMeowRunnerSetup(self)->None: + + monitor_one = WatchdogMonitor(TEST_MONITOR_BASE, {}, {}) + monitor_two = WatchdogMonitor(TEST_MONITOR_BASE, {}, {}) + monitors = [ monitor_one, monitor_two ] + + handler_one = PapermillHandler(TEST_HANDLER_BASE, TEST_JOB_OUTPUT) + handler_two = PapermillHandler(TEST_HANDLER_BASE, TEST_JOB_OUTPUT) + handlers = [ handler_one, handler_two ] + + conductor_one = LocalPythonConductor() + conductor_two = LocalPythonConductor() + conductors = [ conductor_one, conductor_two ] + + runner = MeowRunner(monitor_one, handler_one, conductor_one) + + self.assertIsInstance(runner.monitors, list) + for m in runner.monitors: + self.assertIsInstance(m, BaseMonitor) + self.assertEqual(len(runner.monitors), 1) + self.assertEqual(runner.monitors[0], monitor_one) + + self.assertIsInstance(runner.from_monitors, list) + self.assertEqual(len(runner.from_monitors), 1) + runner.monitors[0].to_runner.send("monitor test message") + message = None + if runner.from_monitors[0].poll(3): + message = runner.from_monitors[0].recv() + self.assertIsNotNone(message) + self.assertEqual(message, "monitor test message") + + self.assertIsInstance(runner.handlers, dict) + for handler_list in runner.handlers.values(): + for h in handler_list: + self.assertIsInstance(h, BaseHandler) + self.assertEqual( + len(runner.handlers.keys()), len(handler_one.valid_event_types())) + for event_type in handler_one.valid_event_types(): + self.assertIn(event_type, runner.handlers.keys()) + self.assertEqual(len(runner.handlers[event_type]), 1) + self.assertEqual(runner.handlers[event_type][0], handler_one) + + self.assertIsInstance(runner.from_handlers, list) + self.assertEqual(len(runner.from_handlers), 1) + runner.handlers[handler_one.valid_event_types()[0]][0].to_runner.send( + "handler test message") + message = None + if runner.from_handlers[0].poll(3): + message = runner.from_handlers[0].recv() + self.assertIsNotNone(message) + self.assertEqual(message, "handler test message") + + self.assertIsInstance(runner.conductors, dict) + for conductor_list in runner.conductors.values(): + for c in conductor_list: + self.assertIsInstance(c, BaseConductor) + self.assertEqual( + len(runner.conductors.keys()), len(conductor_one.valid_job_types())) + for job_type in conductor_one.valid_job_types(): + self.assertIn(job_type, runner.conductors.keys()) + self.assertEqual(len(runner.conductors[job_type]), 1) + self.assertEqual(runner.conductors[job_type][0], conductor_one) + + runner = MeowRunner(monitors, handlers, conductors) + + self.assertIsInstance(runner.monitors, list) + for m in runner.monitors: + self.assertIsInstance(m, BaseMonitor) + self.assertEqual(len(runner.monitors), len(monitors)) + self.assertIn(monitor_one, runner.monitors) + self.assertIn(monitor_two, runner.monitors) + + self.assertIsInstance(runner.from_monitors, list) + self.assertEqual(len(runner.from_monitors), len(monitors)) + for rm in runner.monitors: + rm.to_runner.send("monitor test message") + messages = [None] * len(monitors) + for i, rfm in enumerate(runner.from_monitors): + if rfm.poll(3): + messages[i] = rfm.recv() + for m in messages: + self.assertIsNotNone(m) + self.assertEqual(m, "monitor test message") + + self.assertIsInstance(runner.handlers, dict) + for handler_list in runner.handlers.values(): + for h in handler_list: + self.assertIsInstance(h, BaseHandler) + all_events = [] + for h in handlers: + for e in h.valid_event_types(): + if e not in all_events: + all_events.append(e) + self.assertEqual(len(runner.handlers.keys()), len(all_events)) + for handler in handlers: + for event_type in handler.valid_event_types(): + relevent_handlers = [h for h in handlers + if event_type in h.valid_event_types()] + self.assertIn(event_type, runner.handlers.keys()) + self.assertEqual(len(runner.handlers[event_type]), + len(relevent_handlers)) + for rh in relevent_handlers: + self.assertIn(rh, runner.handlers[event_type]) + + self.assertIsInstance(runner.from_handlers, list) + self.assertEqual(len(runner.from_handlers), len(handlers)) + runner_handlers = [] + for handler_list in runner.handlers.values(): + for h in handler_list: + runner_handlers.append(h) + runner_handlers = [h for h in handler_list for + handler_list in runner.handlers.values()] + for rh in handler_list: + rh.to_runner.send("handler test message") + message = None + if runner.from_handlers[0].poll(3): + message = runner.from_handlers[0].recv() + self.assertIsNotNone(message) + self.assertEqual(message, "handler test message") + + self.assertIsInstance(runner.conductors, dict) + for conductor_list in runner.conductors.values(): + for c in conductor_list: + self.assertIsInstance(c, BaseConductor) + all_jobs = [] + for c in conductors: + for j in c.valid_job_types(): + if j not in all_jobs: + all_jobs.append(j) + self.assertEqual(len(runner.conductors.keys()), len(all_jobs)) + for conductor in conductors: + for job_type in conductor.valid_job_types(): + relevent_conductors = [c for c in conductors + if job_type in c.valid_job_types()] + self.assertIn(job_type, runner.conductors.keys()) + self.assertEqual(len(runner.conductors[job_type]), + len(relevent_conductors)) + for rc in relevent_conductors: + self.assertIn(rc, runner.conductors[job_type]) + + def testMeowRunnerExecution(self)->None: pattern_one = FileEventPattern( "pattern_one", "start/A.txt", "recipe_one", "infile", parameters={ @@ -45,24 +186,22 @@ class MeowTests(unittest.TestCase): recipe.name: recipe, } - monitor_debug_stream = io.StringIO("") - handler_debug_stream = io.StringIO("") + runner_debug_stream = io.StringIO("") runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, patterns, recipes, - print=monitor_debug_stream, - logging=3, settletime=1 ), PapermillHandler( TEST_HANDLER_BASE, TEST_JOB_OUTPUT, - print=handler_debug_stream, - logging=3 - ) + ), + LocalPythonConductor(), + print=runner_debug_stream, + logging=3 ) runner.start() @@ -79,18 +218,22 @@ class MeowTests(unittest.TestCase): job_id = None while loops < 15: sleep(1) - handler_debug_stream.seek(0) - messages = handler_debug_stream.readlines() + runner_debug_stream.seek(0) + messages = runner_debug_stream.readlines() for msg in messages: self.assertNotIn("ERROR", msg) - if "INFO: Completed job " in msg: - job_id = msg.replace("INFO: Completed job ", "") - job_id = job_id[:job_id.index(" with output")] + if "INFO: Completed execution for job: '" in msg: + job_id = msg.replace( + "INFO: Completed execution for job: '", "") + job_id = job_id[:-2] loops = 15 loops += 1 + print("JOB ID:") + print(job_id) + self.assertIsNotNone(job_id) self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1) self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT)) @@ -111,7 +254,7 @@ class MeowTests(unittest.TestCase): self.assertEqual(data, "Initial Data\nA line from a test Pattern") - def testMeowRunnerLinkeExecution(self)->None: + def testMeowRunnerLinkedExecution(self)->None: pattern_one = FileEventPattern( "pattern_one", "start/A.txt", "recipe_one", "infile", parameters={ @@ -134,26 +277,23 @@ class MeowTests(unittest.TestCase): recipes = { recipe.name: recipe, } - rules = create_rules(patterns, recipes) - monitor_debug_stream = io.StringIO("") - handler_debug_stream = io.StringIO("") + runner_debug_stream = io.StringIO("") runner = MeowRunner( WatchdogMonitor( TEST_MONITOR_BASE, patterns, recipes, - print=monitor_debug_stream, - logging=3, settletime=1 ), PapermillHandler( TEST_HANDLER_BASE, - TEST_JOB_OUTPUT, - print=handler_debug_stream, - logging=3 - ) + TEST_JOB_OUTPUT + ), + LocalPythonConductor(), + print=runner_debug_stream, + logging=3 ) runner.start() @@ -170,15 +310,16 @@ class MeowTests(unittest.TestCase): job_ids = [] while len(job_ids) < 2 and loops < 15: sleep(1) - handler_debug_stream.seek(0) - messages = handler_debug_stream.readlines() + runner_debug_stream.seek(0) + messages = runner_debug_stream.readlines() for msg in messages: self.assertNotIn("ERROR", msg) - if "INFO: Completed job " in msg: - job_id = msg.replace("INFO: Completed job ", "") - job_id = job_id[:job_id.index(" with output")] + if "INFO: Completed execution for job: '" in msg: + job_id = msg.replace( + "INFO: Completed execution for job: '", "") + job_id = job_id[:-2] if job_id not in job_ids: job_ids.append(job_id) loops += 1 diff --git a/tests/test_validation.py b/tests/test_validation.py index 5a25b26..aad35e8 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -1,22 +1,26 @@ +import io import unittest import os +from datetime import datetime from typing import Any, Union from core.correctness.validation import check_type, check_implementation, \ valid_string, valid_dict, valid_list, valid_existing_file_path, \ - valid_existing_dir_path, valid_non_existing_path, valid_event + valid_existing_dir_path, valid_non_existing_path, valid_event, valid_job, \ + setup_debugging from core.correctness.vars import VALID_NAME_CHARS, TEST_MONITOR_BASE, \ - SHA256, EVENT_TYPE, EVENT_PATH + SHA256, EVENT_TYPE, EVENT_PATH, JOB_TYPE, JOB_EVENT, JOB_ID, JOB_PATTERN, \ + JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME from core.functionality import rmtree, make_dir class CorrectnessTests(unittest.TestCase): - def setUp(self) -> None: + def setUp(self)->None: super().setUp() make_dir(TEST_MONITOR_BASE, ensure_clean=True) - def tearDown(self) -> None: + def tearDown(self)->None: super().tearDown() rmtree(TEST_MONITOR_BASE) rmtree("first") @@ -219,3 +223,38 @@ class CorrectnessTests(unittest.TestCase): with self.assertRaises(KeyError): valid_event({}) + + def testJobValidation(self)->None: + valid_job({ + JOB_TYPE: "test", + JOB_EVENT: {}, + JOB_ID: "id", + JOB_PATTERN: "pattern", + JOB_RECIPE: "recipe", + JOB_RULE: "rule", + JOB_STATUS: "status", + JOB_CREATE_TIME: datetime.now() + }) + + with self.assertRaises(KeyError): + valid_job({JOB_TYPE: "test"}) + + with self.assertRaises(KeyError): + valid_job({"JOB_TYPE": "test"}) + + with self.assertRaises(KeyError): + valid_job({}) + + def testSetupDebugging(self)->None: + stream = io.StringIO("") + + target, level = setup_debugging(stream, 1) + + self.assertIsInstance(target, io.StringIO) + self.assertIsInstance(level, int) + + with self.assertRaises(TypeError): + setup_debugging("stream", 1) + + with self.assertRaises(TypeError): + setup_debugging(stream, "1")