diff --git a/core/correctness/validation.py b/core/correctness/validation.py index 21143cc..ab3ccc3 100644 --- a/core/correctness/validation.py +++ b/core/correctness/validation.py @@ -1,7 +1,7 @@ from inspect import signature -from os.path import sep -from typing import Any, _SpecialForm, Union, get_origin, get_args +from os.path import sep, exists, isfile, isdir, dirname +from typing import Any, _SpecialForm, Union, Tuple, get_origin, get_args from core.correctness.vars import VALID_PATH_CHARS, get_not_imp_msg @@ -129,10 +129,52 @@ def valid_list(variable:list[Any], entry_type:type, for entry in variable: check_type(entry, entry_type, alt_types=alt_types) -def valid_path(variable:str, allow_base=False, extension:str="", min_length=1): +def valid_path(variable:str, allow_base:bool=False, extension:str="", + min_length:int=1): valid_string(variable, VALID_PATH_CHARS, min_length=min_length) if not allow_base and variable.startswith(sep): raise ValueError(f"Cannot accept path '{variable}'. Must be relative.") if extension and not variable.endswith(extension): raise ValueError(f"Path '{variable}' does not have required " f"extension '{extension}'.") + +def valid_existing_file_path(variable:str, allow_base:bool=False, + extension:str=""): + valid_path(variable, allow_base=allow_base, extension=extension) + if not exists(variable): + raise FileNotFoundError( + f"Requested file path '{variable}' does not exist.") + if not isfile(variable): + raise ValueError( + f"Requested file '{variable}' is not a file.") + +def valid_existing_dir_path(variable:str, allow_base:bool=False): + valid_path(variable, allow_base=allow_base, extension="") + if not exists(variable): + raise FileNotFoundError( + f"Requested dir path '{variable}' does not exist.") + if not isdir(variable): + raise ValueError( + f"Requested dir '{variable}' is not a directory.") + +def valid_non_existing_path(variable:str, allow_base:bool=False): + valid_path(variable, allow_base=allow_base, extension="") + if exists(variable): + raise ValueError(f"Requested path '{variable}' already exists.") + if dirname(variable) and not exists(dirname(variable)): + raise ValueError( + f"Route to requested path '{variable}' does not exist.") + +def setup_debugging(print:Any=None, logging:int=0)->Tuple[Any,int]: + check_type(logging, int) + if print is None: + return None, 0 + else: + if not isinstance(print, object): + raise TypeError(f"Invalid print location provided") + writeable = getattr(print, "write", None) + if not writeable or not callable(writeable): + raise TypeError(f"Print object does not implement required " + "'write' function") + + return print, logging diff --git a/core/correctness/vars.py b/core/correctness/vars.py index 9df02d3..b344977 100644 --- a/core/correctness/vars.py +++ b/core/correctness/vars.py @@ -7,6 +7,7 @@ from inspect import signature from typing import Union +# validation CHAR_LOWERCASE = 'abcdefghijklmnopqrstuvwxyz' CHAR_UPPERCASE = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' CHAR_NUMERIC = '0123456789' @@ -26,13 +27,167 @@ VALID_TRIGGERING_PATH_CHARS = VALID_NAME_CHARS + ".*" + os.path.sep VALID_CHANNELS = Union[Connection,Queue] +# hashing +HASH_BUFFER_SIZE = 65536 +SHA256 = "sha256" + +# testing BAREBONES_NOTEBOOK = { "cells": [], "metadata": {}, "nbformat": 4, "nbformat_minor": 4 } +TEST_MONITOR_BASE = "test_monitor_base" +TEST_HANDLER_BASE = "test_handler_base" +TEST_JOB_OUTPUT = "test_job_output" +COMPLETE_NOTEBOOK = { + "cells": [ + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": "# The first cell\n\ns = 0\nnum = 1000" + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": "for i in range(num):\n s += i" + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": "div_by = 4" + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": "result = s / div_by" + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": "print(result)" + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} +APPENDING_NOTEBOOK = { + "cells": [ + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Default parameters values\n", + "# The line to append\n", + "extra = 'This line comes from a default pattern'\n", + "# Data input file location\n", + "infile = 'start/alpha.txt'\n", + "# Output file location\n", + "outfile = 'first/alpha.txt'" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# load in dataset. This should be a text file\n", + "with open(infile) as input_file:\n", + " data = input_file.read()" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Append the line\n", + "appended = data + '\\n' + extra" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "# Create output directory if it doesn't exist\n", + "output_dir_path = os.path.dirname(outfile)\n", + "\n", + "if output_dir_path:\n", + " os.makedirs(output_dir_path, exist_ok=True)\n", + "\n", + "# Save added array as new dataset\n", + "with open(outfile, 'w') as output_file:\n", + " output_file.write(appended)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.6 (main, Nov 14 2022, 16:10:14) [GCC 11.3.0]" + }, + "vscode": { + "interpreter": { + "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} +# events FILE_CREATE_EVENT = "file_created" FILE_MODIFY_EVENT = "file_modified" FILE_MOVED_EVENT = "file_moved" @@ -57,9 +212,12 @@ DIR_EVENTS = [ DIR_DELETED_EVENT ] -PIPE_READ = 0 -PIPE_WRITE = 1 +# debug printing levels +DEBUG_ERROR = 1 +DEBUG_WARNING = 2 +DEBUG_INFO = 3 +# debug message functions def get_drt_imp_msg(base_class): return f"{base_class.__name__} may not be instantiated directly. " \ f"Implement a child class." diff --git a/core/functionality.py b/core/functionality.py index 6aaacc9..de10686 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -1,16 +1,25 @@ -import sys +import copy +import hashlib import inspect +import json +import nbformat +import os +import sys +import yaml from multiprocessing.connection import Connection, wait as multi_wait from multiprocessing.queues import Queue -from typing import Union +from papermill.translators import papermill_translators +from typing import Any, Union from random import SystemRandom from core.meow import BasePattern, BaseRecipe, BaseRule -from core.correctness.validation import check_type, valid_dict, valid_list +from core.correctness.validation import check_type, valid_dict, valid_list, \ + valid_existing_file_path, valid_path from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ - VALID_CHANNELS + VALID_CHANNELS, HASH_BUFFER_SIZE, SHA256, DEBUG_WARNING, DEBUG_ERROR, \ + DEBUG_INFO def check_pattern_dict(patterns, min_length=1): valid_dict(patterns, str, BasePattern, strict=False, min_length=min_length) @@ -87,3 +96,203 @@ def wait(inputs:list[VALID_CHANNELS])->list[VALID_CHANNELS]: (type(i) is Connection and i in ready) \ or (type(i) is Queue and i._reader in ready)] return ready_inputs + +def _get_file_sha256(file_path): + sha256_hash = hashlib.sha256() + + with open(file_path, 'rb') as file_to_hash: + while True: + buffer = file_to_hash.read(HASH_BUFFER_SIZE) + if not buffer: + break + sha256_hash.update(buffer) + + return sha256_hash.hexdigest() + +def get_file_hash(file_path:str, hash:str): + check_type(hash, str) + + import os + valid_existing_file_path(file_path) + + valid_hashes = { + SHA256: _get_file_sha256 + } + if hash not in valid_hashes: + raise KeyError(f"Cannot use hash '{hash}'. Valid are " + "'{list(valid_hashes.keys())}") + + return valid_hashes[hash](file_path) + +def rmtree(directory:str): + """ + Remove a directory and all its contents. + Should be faster than shutil.rmtree + + :param: (str) The firectory to empty and remove + + :return: No return + """ + for root, dirs, files in os.walk(directory, topdown=False): + for file in files: + os.remove(os.path.join(root, file)) + for dir in dirs: + rmtree(os.path.join(root, dir)) + os.rmdir(directory) + +def make_dir(path:str, can_exist:bool=True, ensure_clean:bool=False): + """ + Creates a new directory at the given path. + + :param path: (str) The directory path. + + :param can_exist: (boolean) [optional] A toggle for if a previously + existing directory at the path will throw an error or not. Default is + true (e.g. no error is thrown if the path already exists) + + :param ensure_clean: (boolean) [optional] A toggle for if a previously + existing directory at the path will be replaced with a new emtpy directory. + Default is False. + + :return: No return + """ + if not os.path.exists(path): + os.mkdir(path) + elif os.path.isfile(path): + raise ValueError('Cannot make directory in %s as it already ' + 'exists and is a file' % path) + else: + if not can_exist: + if ensure_clean: + rmtree(path) + os.mkdir(path) + else: + raise ValueError("Directory %s already exists. " % path) + +def read_yaml(filepath:str): + """ + Reads a file path as a yaml object. + + :param filepath: (str) The file to read. + + :return: (object) An object read from the file. + """ + with open(filepath, 'r') as yaml_file: + return yaml.load(yaml_file, Loader=yaml.Loader) + +def write_yaml(source:Any, filename:str, mode:str='w'): + """ + Writes a given objcet to a yaml file. + + :param source: (any) A python object to be written. + + :param filename: (str) The filename to be written to. + + :return: No return + """ + with open(filename, mode) as param_file: + yaml.dump(source, param_file, default_flow_style=False) + +def read_notebook(filepath:str): + valid_path(filepath, extension="ipynb") + with open(filepath, 'r') as read_file: + return json.load(read_file) + +def write_notebook(source:dict[str,Any], filename:str): + """ + Writes the given notebook source code to a given filename. + + :param source: (dict) The notebook source dictionary. + + :param filename: (str) The filename to write to. + + :return: No return + """ + with open(filename, 'w') as job_file: + json.dump(source, job_file) + +# Adapted from: https://github.com/rasmunk/notebook_parameterizer +def parameterize_jupyter_notebook( jupyter_notebook:dict[str,Any], + parameters:dict[str,Any], expand_env_values:bool=False)->dict[str,Any]: + nbformat.validate(jupyter_notebook) + check_type(parameters, dict) + + if jupyter_notebook["nbformat"] != 4: + raise Warning( + "Parameterization designed to work with nbformat version 4. " + f"Differing version of '{jupyter_notebook['nbformat']}' may " + "produce unexpeted results.") + + # Load input notebook + if "kernelspec" in jupyter_notebook["metadata"]: + kernel_name = jupyter_notebook["metadata"]["kernelspec"]["name"] + language = jupyter_notebook["metadata"]["kernelspec"]["language"] + if "language_info" in jupyter_notebook["metadata"]: + kernel_name = jupyter_notebook["metadata"]["language_info"]["name"] + language = jupyter_notebook["metadata"]["language_info"]["name"] + else: + raise AttributeError( + f"Notebook lacks key language and/or kernel_name attributes " + "within metadata") + + translator = papermill_translators.find_translator(kernel_name, language) + + output_notebook = copy.deepcopy(jupyter_notebook) + + # Find each + cells = output_notebook["cells"] + code_cells = [ + (idx, cell) for idx, cell in enumerate(cells) \ + if cell["cell_type"] == "code" + ] + for idx, cell in code_cells: + cell_updated = False + source = cell["source"] + # Either single string or a list of strings + if isinstance(source, str): + lines = source.split("\n") + else: + lines = source + + for idy, line in enumerate(lines): + if "=" in line: + d_line = list(map(lambda x: x.replace(" ", ""), + line.split("="))) + # Matching parameter name + if len(d_line) == 2 and d_line[0] in parameters: + value = parameters[d_line[0]] + # Whether to expand value from os env + if ( + expand_env_values + and isinstance(value, str) + and value.startswith("ENV_") + ): + env_var = value.replace("ENV_", "") + value = os.getenv( + env_var, + "MISSING ENVIRONMENT VARIABLE: {}".format(env_var) + ) + lines[idy] = translator.assign( + d_line[0], translator.translate(value) + ) + + cell_updated = True + if cell_updated: + cells[idx]["source"] = "\n".join(lines) + + # Validate that the parameterized notebook is still valid + nbformat.validate(output_notebook, version=4) + + return output_notebook + +def print_debug(print_target, debug_level, msg, level)->None: + if print_target is None: + return + else: + if level <= debug_level: + status = "ERROR" + if level == DEBUG_INFO: + status = "INFO" + elif level == DEBUG_WARNING: + status = "WARNING" + print(f"{status}: {msg}", file=print_target) \ No newline at end of file diff --git a/core/meow.py b/core/meow.py index a5769d2..1224546 100644 --- a/core/meow.py +++ b/core/meow.py @@ -187,7 +187,6 @@ class BaseHandler: pass -# TODO test me class MeowRunner: monitor:BaseMonitor handler:BaseHandler @@ -199,9 +198,13 @@ class MeowRunner: def start(self)->None: self.monitor.start() + if hasattr(self.handler, "start"): + self.handler.start() def stop(self)->None: self.monitor.stop() + if hasattr(self.handler, "stop"): + self.handler.stop() def _is_valid_monitor(self, monitor:BaseMonitor)->None: check_type(monitor, BaseMonitor) diff --git a/patterns/file_event_pattern.py b/patterns/file_event_pattern.py index bbc4982..1520223 100644 --- a/patterns/file_event_pattern.py +++ b/patterns/file_event_pattern.py @@ -1,5 +1,6 @@ import threading +import sys import os from fnmatch import translate @@ -7,39 +8,38 @@ from re import match from time import time, sleep from typing import Any from watchdog.observers import Observer -from watchdog.events import PatternMatchingEventHandler, FileCreatedEvent, \ - FileModifiedEvent, FileMovedEvent, FileClosedEvent, FileDeletedEvent, \ - DirCreatedEvent, DirDeletedEvent, DirModifiedEvent, DirMovedEvent +from watchdog.events import PatternMatchingEventHandler from core.correctness.validation import check_type, valid_string, \ - valid_dict, valid_list, valid_path + valid_dict, valid_list, valid_path, valid_existing_dir_path, \ + setup_debugging from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ VALID_VARIABLE_NAME_CHARS, FILE_EVENTS, FILE_CREATE_EVENT, \ - FILE_MODIFY_EVENT, FILE_MOVED_EVENT, FILE_CLOSED_EVENT, \ - FILE_DELETED_EVENT, DIR_CREATE_EVENT, DIR_DELETED_EVENT, \ - DIR_MODIFY_EVENT, DIR_MOVED_EVENT, VALID_CHANNELS + FILE_MODIFY_EVENT, FILE_MOVED_EVENT, VALID_CHANNELS, DEBUG_INFO, \ + DEBUG_ERROR, DEBUG_WARNING +from core.functionality import print_debug from core.meow import BasePattern, BaseMonitor, BaseRule -_EVENT_TRANSLATIONS = { - FileCreatedEvent: FILE_CREATE_EVENT, - FileModifiedEvent: FILE_MODIFY_EVENT, - FileMovedEvent: FILE_MOVED_EVENT, - FileClosedEvent: FILE_CLOSED_EVENT, - FileDeletedEvent: FILE_DELETED_EVENT, - DirCreatedEvent: DIR_CREATE_EVENT, - DirDeletedEvent: DIR_DELETED_EVENT, - DirModifiedEvent: DIR_MODIFY_EVENT, - DirMovedEvent: DIR_MOVED_EVENT -} +_DEFAULT_MASK = [ + FILE_CREATE_EVENT, + FILE_MODIFY_EVENT, + FILE_MOVED_EVENT +] + +SWEEP_START = "start" +SWEEP_STOP = "stop" +SWEEP_JUMP = "jump" class FileEventPattern(BasePattern): triggering_path:str triggering_file:str event_mask:list[str] + sweep:dict[str,Any] def __init__(self, name:str, triggering_path:str, recipe:str, - triggering_file:str, event_mask:list[str]=FILE_EVENTS, - parameters:dict[str,Any]={}, outputs:dict[str,Any]={}): + triggering_file:str, event_mask:list[str]=_DEFAULT_MASK, + parameters:dict[str,Any]={}, outputs:dict[str,Any]={}, + sweep:dict[str,Any]={}): super().__init__(name, recipe, parameters, outputs) self._is_valid_triggering_path(triggering_path) self.triggering_path = triggering_path @@ -47,6 +47,8 @@ class FileEventPattern(BasePattern): self.triggering_file = triggering_file self._is_valid_event_mask(event_mask) self.event_mask = event_mask + self._is_valid_sweep(sweep) + self.sweep = sweep def _is_valid_recipe(self, recipe:str)->None: valid_string(recipe, VALID_RECIPE_NAME_CHARS) @@ -79,20 +81,57 @@ class FileEventPattern(BasePattern): raise ValueError(f"Invalid event mask '{mask}'. Valid are: " f"{FILE_EVENTS}") + def _is_valid_sweep(self, sweep)->None: + check_type(sweep, dict) + if not sweep: + return + for k, v in sweep.items(): + valid_dict( + v, str, Any, [ + SWEEP_START, SWEEP_STOP, SWEEP_JUMP + ], strict=True) + + check_type( + v[SWEEP_START], expected_type=int, alt_types=[float, complex]) + check_type( + v[SWEEP_STOP], expected_type=int, alt_types=[float, complex]) + check_type( + v[SWEEP_JUMP], expected_type=int, alt_types=[float, complex]) + # Try to check that this loop is not infinite + if v[SWEEP_JUMP] == 0: + raise ValueError( + f"Cannot create sweep with a '{SWEEP_JUMP}' value of zero" + ) + elif v[SWEEP_JUMP] > 0: + if not v[SWEEP_STOP] > v[SWEEP_START]: + raise ValueError( + "Cannot create sweep with a positive '{SWEEP_JUMP}' " + "value where the end point is smaller than the start." + ) + elif v[SWEEP_JUMP] < 0: + if not v[SWEEP_STOP] < v[SWEEP_START]: + raise ValueError( + "Cannot create sweep with a negative '{SWEEP_JUMP}' " + "value where the end point is smaller than the start." + ) + class WatchdogMonitor(BaseMonitor): event_handler:PatternMatchingEventHandler monitor:Observer base_dir:str + debug_level:int + _print_target:Any _rules_lock:threading.Lock def __init__(self, base_dir:str, rules:dict[str, BaseRule], report:VALID_CHANNELS, autostart=False, - settletime:int=1)->None: + settletime:int=1, print:Any=sys.stdout, logging:int=0)->None: super().__init__(rules, report) self._is_valid_base_dir(base_dir) self.base_dir = base_dir check_type(settletime, int) + self._print_target, self.debug_level = setup_debugging(print, logging) self._rules_lock = threading.Lock() self.event_handler = WatchdogEventHandler(self, settletime=settletime) self.monitor = Observer() @@ -101,14 +140,20 @@ class WatchdogMonitor(BaseMonitor): self.base_dir, recursive=True ) + print_debug(self._print_target, self.debug_level, + "Created new WatchdogMonitor instance", DEBUG_INFO) if autostart: self.start() def start(self)->None: + print_debug(self._print_target, self.debug_level, + "Starting WatchdogMonitor", DEBUG_INFO) self.monitor.start() def stop(self)->None: + print_debug(self._print_target, self.debug_level, + "Stopping WatchdogMonitor", DEBUG_INFO) self.monitor.stop() def match(self, event)->None: @@ -134,6 +179,10 @@ class WatchdogMonitor(BaseMonitor): direct_hit = match(direct_regexp, handle_path) if direct_hit or recursive_hit: + print_debug(self._print_target, self.debug_level, + f"Event at {src_path} of type {event_type} hit rule " + f"{rule.name}", DEBUG_INFO) + event.monitor_base = self.base_dir self.report.send((event, rule)) except Exception as e: @@ -144,7 +193,7 @@ class WatchdogMonitor(BaseMonitor): def _is_valid_base_dir(self, base_dir:str)->None: - valid_path(base_dir) + valid_existing_dir_path(base_dir) def _is_valid_report(self, report:VALID_CHANNELS)->None: check_type(report, VALID_CHANNELS) @@ -205,7 +254,7 @@ class WatchdogEventHandler(PatternMatchingEventHandler): def on_created(self, event): self.handle_event(event) - + def on_modified(self, event): self.handle_event(event) diff --git a/recipes/__init__.py b/recipes/__init__.py index e456f1f..abae3c0 100644 --- a/recipes/__init__.py +++ b/recipes/__init__.py @@ -1,2 +1,3 @@ -from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe +from recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ + PapermillHandler diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index f3a22ff..54c21e4 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -1,15 +1,67 @@ +import copy import nbformat +import os +import papermill +import shutil +import sys import threading +from datetime import datetime from multiprocessing import Pipe +from time import sleep from typing import Any +from watchdog.events import FileSystemEvent from core.correctness.validation import check_type, valid_string, \ - valid_dict, valid_path, valid_list -from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, VALID_CHANNELS -from core.functionality import wait -from core.meow import BaseRecipe, BaseHandler + valid_dict, valid_path, valid_list, valid_existing_dir_path, \ + setup_debugging +from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, VALID_CHANNELS, \ + SHA256, DEBUG_ERROR, DEBUG_WARNING, DEBUG_INFO +from core.functionality import wait, get_file_hash, generate_id, make_dir, \ + write_yaml, write_notebook, get_file_hash, parameterize_jupyter_notebook, \ + print_debug +from core.meow import BaseRecipe, BaseHandler, BaseRule +from patterns.file_event_pattern import SWEEP_START, SWEEP_STOP, SWEEP_JUMP + +# mig trigger keyword replacements +KEYWORD_PATH = "{PATH}" +KEYWORD_REL_PATH = "{REL_PATH}" +KEYWORD_DIR = "{DIR}" +KEYWORD_REL_DIR = "{REL_DIR}" +KEYWORD_FILENAME = "{FILENAME}" +KEYWORD_PREFIX = "{PREFIX}" +KEYWORD_BASE = "{VGRID}" +KEYWORD_EXTENSION = "{EXTENSION}" +KEYWORD_JOB = "{JOB}" + +# job definitions +JOB_ID = 'id' +JOB_PATTERN = 'pattern' +JOB_RECIPE = 'recipe' +JOB_RULE = 'rule' +JOB_PATH = 'path' +JOB_HASH = 'hash' +JOB_STATUS = 'status' +JOB_CREATE_TIME = 'create' +JOB_START_TIME = 'start' +JOB_END_TIME = 'end' +JOB_ERROR = 'error' +JOB_REQUIREMENTS = 'requirements' + +# job statuses +STATUS_QUEUED = 'queued' +STATUS_RUNNING = 'running' +STATUS_SKIPPED = 'skipped' +STATUS_FAILED = 'failed' +STATUS_DONE = 'done' + +# job definition files +META_FILE = 'job.yml' +BASE_FILE = 'base.ipynb' +PARAMS_FILE = 'params.yml' +JOB_FILE = 'job.ipynb' +RESULT_FILE = 'result.ipynb' class JupyterNotebookRecipe(BaseRecipe): source:str @@ -38,12 +90,28 @@ class JupyterNotebookRecipe(BaseRecipe): valid_string(k, VALID_VARIABLE_NAME_CHARS) class PapermillHandler(BaseHandler): + handler_base:str + output_dir:str + debug_level:int _worker:threading.Thread _stop_pipe:Pipe - def __init__(self, inputs:list[VALID_CHANNELS])->None: + _jobs:list[str] + _jobs_lock:threading.Lock + _print_target:Any + def __init__(self, inputs:list[VALID_CHANNELS], handler_base:str, + output_dir:str, print:Any=sys.stdout, logging:int=0)->None: super().__init__(inputs) + self._is_valid_handler_base(handler_base) + self.handler_base = handler_base + self._is_valid_output_dir(output_dir) + self.output_dir = output_dir + self._print_target, self.debug_level = setup_debugging(print, logging) self._worker = None self._stop_pipe = Pipe() + self._jobs = [] + self._jobs_lock = threading.Lock() + print_debug(self._print_target, self.debug_level, + "Created new PapermillHandler instance", DEBUG_INFO) def run(self)->None: all_inputs = self.inputs + [self._stop_pipe[0]] @@ -66,19 +134,233 @@ class PapermillHandler(BaseHandler): args=[]) self._worker.daemon = True self._worker.start() + print_debug(self._print_target, self.debug_level, + "Starting PapermillHandler run...", DEBUG_INFO) else: - raise RuntimeWarning("Repeated calls to start have no effect.") + msg = "Repeated calls to start have no effect." + print_debug(self._print_target, self.debug_level, + msg, DEBUG_WARNING) + raise RuntimeWarning(msg) def stop(self)->None: if self._worker is None: - raise RuntimeWarning("Cannot stop thread that is not started.") + msg = "Cannot stop thread that is not started." + print_debug(self._print_target, self.debug_level, + msg, DEBUG_WARNING) + raise RuntimeWarning(msg) else: self._stop_pipe[1].send(1) self._worker.join() + print_debug(self._print_target, self.debug_level, + "Worker thread stopped", DEBUG_INFO) - def handle(self, event, rule)->None: + def handle(self, event:FileSystemEvent, rule:BaseRule)->None: # TODO finish implementation and test - pass + + print_debug(self._print_target, self.debug_level, + f"Handling event {event.src_path}", DEBUG_INFO) + + file_hash = get_file_hash(event.src_path, SHA256) + + yaml_dict = {} + for var, val in rule.pattern.parameters.items(): + yaml_dict[var] = val + for var, val in rule.pattern.outputs.items(): + yaml_dict[var] = val + yaml_dict[rule.pattern.triggering_file] = event.src_path + + if not rule.pattern.sweep: + waiting_for_threaded_resources = True + while waiting_for_threaded_resources: + try: + worker = threading.Thread( + target=self.execute_job, + args=[event, rule, yaml_dict, file_hash]) + worker.daemon = True + worker.start() + waiting_for_threaded_resources = False + except threading.ThreadError: + sleep(1) + else: + for var, val in rule.pattern.sweep.items(): + values = [] + + par_val = rule.pattern.sweep[SWEEP_START] + while par_val <= rule.pattern.sweep[SWEEP_STOP]: + values.append(par_val) + par_val += rule.pattern.sweep[SWEEP_JUMP] + + for value in values: + yaml_dict[var] = value + waiting_for_threaded_resources = True + while waiting_for_threaded_resources: + try: + worker = threading.Thread( + target=self.execute_job, + args=[event, rule, yaml_dict, file_hash]) + worker.daemon = True + worker.start() + waiting_for_threaded_resources = False + except threading.ThreadError: + sleep(1) + + def add_job(self, job): + self._jobs_lock.acquire() + try: + self._jobs.append(job) + except Exception as e: + self._jobs_lock.release() + raise e + self._jobs_lock.release() + + def get_jobs(self): + self._jobs_lock.acquire() + try: + jobs_deepcopy = copy.deepcopy(self._jobs) + except Exception as e: + self._jobs_lock.release() + raise e + self._jobs_lock.release() + return jobs_deepcopy def _is_valid_inputs(self, inputs:list[VALID_CHANNELS])->None: valid_list(inputs, VALID_CHANNELS) + + def _is_valid_handler_base(self, handler_base)->None: + valid_existing_dir_path(handler_base) + + def _is_valid_output_dir(self, output_dir)->None: + valid_existing_dir_path(output_dir, allow_base=True) + + def execute_job(self, event:FileSystemEvent, rule:BaseRule, + yaml_dict:dict[str,Any], triggerfile_hash:str)->None: + + job_dict = { + JOB_ID: generate_id(prefix="job_", existing_ids=self.get_jobs()), + JOB_PATTERN: rule.pattern, + JOB_RECIPE: rule.recipe, + JOB_RULE: rule.name, + JOB_PATH: event.src_path, + JOB_HASH: triggerfile_hash, + JOB_STATUS: STATUS_QUEUED, + JOB_CREATE_TIME: datetime.now(), + JOB_REQUIREMENTS: rule.recipe.requirements + } + + print_debug(self._print_target, self.debug_level, + f"Creating job for event at {event.src_path} with ID " + f"{job_dict[JOB_ID]}", DEBUG_INFO) + + self.add_job(job_dict[JOB_ID]) + + yaml_dict = self.replace_keywords( + yaml_dict, + job_dict[JOB_ID], + event.src_path, + event.monitor_base + ) + + job_dir = os.path.join(self.handler_base, job_dict[JOB_ID]) + make_dir(job_dir) + + meta_file = os.path.join(job_dir, META_FILE) + write_yaml(job_dict, meta_file) + + base_file = os.path.join(job_dir, BASE_FILE) + write_notebook(rule.recipe.recipe, base_file) + + param_file = os.path.join(job_dir, PARAMS_FILE) + write_yaml(yaml_dict, param_file) + + job_file = os.path.join(job_dir, JOB_FILE) + result_file = os.path.join(job_dir, RESULT_FILE) + + job_dict[JOB_STATUS] = STATUS_RUNNING + job_dict[JOB_START_TIME] = datetime.now() + + write_yaml(job_dict, meta_file) + + if JOB_HASH in job_dict: + triggerfile_hash = get_file_hash(job_dict[JOB_PATH], SHA256) + if not triggerfile_hash \ + or triggerfile_hash != job_dict[JOB_HASH]: + job_dict[JOB_STATUS] = STATUS_SKIPPED + job_dict[JOB_END_TIME] = datetime.now() + msg = "Job was skipped as triggering file " + \ + f"'{job_dict[JOB_PATH]}' has been modified since " + \ + "scheduling. Was expected to have hash " + \ + f"'{job_dict[JOB_HASH]}' but has '{triggerfile_hash}'." + job_dict[JOB_ERROR] = msg + write_yaml(job_dict, meta_file) + print_debug(self._print_target, self.debug_level, + msg, DEBUG_ERROR) + return + + try: + job_notebook = parameterize_jupyter_notebook( + rule.recipe.recipe, yaml_dict + ) + write_notebook(job_notebook, job_file) + except Exception: + job_dict[JOB_STATUS] = STATUS_FAILED + job_dict[JOB_END_TIME] = datetime.now() + msg = f"Job file {job_dict[JOB_ID]} was not created successfully" + job_dict[JOB_ERROR] = msg + write_yaml(job_dict, meta_file) + print_debug(self._print_target, self.debug_level, + msg, DEBUG_ERROR) + return + + try: + papermill.execute_notebook(job_file, result_file, {}) + except Exception: + job_dict[JOB_STATUS] = STATUS_FAILED + job_dict[JOB_END_TIME] = datetime.now() + msg = 'Result file %s was not created successfully' + job_dict[JOB_ERROR] = msg + write_yaml(job_dict, meta_file) + print_debug(self._print_target, self.debug_level, + msg, DEBUG_ERROR) + return + + job_dict[JOB_STATUS] = STATUS_DONE + job_dict[JOB_END_TIME] = datetime.now() + write_yaml(job_dict, meta_file) + + job_output_dir = os.path.join(self.output_dir, job_dict[JOB_ID]) + + shutil.move(job_dir, job_output_dir) + + print_debug(self._print_target, self.debug_level, + f"Completed job {job_dict[JOB_ID]} with output at " + f"{job_output_dir}", DEBUG_INFO) + + return + + def replace_keywords(self, old_dict:dict[str,str], job_id:str, + src_path:str, monitor_base:str)->dict[str,str]: + new_dict = {} + + filename = os.path.basename(src_path) + dirname = os.path.dirname(src_path) + relpath = os.path.relpath(src_path, monitor_base) + reldirname = os.path.dirname(relpath) + (prefix, extension) = os.path.splitext(filename) + + for var, val in old_dict.items(): + if isinstance(val, str): + val = val.replace(KEYWORD_PATH, src_path) + val = val.replace(KEYWORD_REL_PATH, relpath) + val = val.replace(KEYWORD_DIR, dirname) + val = val.replace(KEYWORD_REL_DIR, reldirname) + val = val.replace(KEYWORD_FILENAME, filename) + val = val.replace(KEYWORD_PREFIX, prefix) + val = val.replace(KEYWORD_BASE, monitor_base) + val = val.replace(KEYWORD_EXTENSION, extension) + val = val.replace(KEYWORD_JOB, job_id) + + new_dict[var] = val + else: + new_dict[var] = val + + return new_dict diff --git a/tests/test_meow.py b/tests/test_meow.py index 662f8ee..60cbf70 100644 --- a/tests/test_meow.py +++ b/tests/test_meow.py @@ -271,7 +271,7 @@ class MeowTests(unittest.TestCase): loops = 0 job_ids = [] - while len(job_ids) < 2 or loops < 30: + while len(job_ids) < 2 and loops < 15: sleep(1) handler_debug_stream.seek(0) messages = handler_debug_stream.readlines() diff --git a/tests/test_patterns.py b/tests/test_patterns.py index 5cb39d3..850605f 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -8,7 +8,7 @@ from core.correctness.vars import FILE_EVENTS, FILE_CREATE_EVENT, \ BAREBONES_NOTEBOOK, TEST_MONITOR_BASE from core.functionality import create_rules, rmtree, make_dir from patterns.file_event_pattern import FileEventPattern, WatchdogMonitor, \ - _DEFAULT_MASK + _DEFAULT_MASK, SWEEP_START, SWEEP_STOP, SWEEP_JUMP from recipes import JupyterNotebookRecipe class CorrectnessTests(unittest.TestCase): @@ -105,6 +105,45 @@ class CorrectnessTests(unittest.TestCase): fep = FileEventPattern("name", "path", "recipe", "file", event_mask=[FILE_CREATE_EVENT, "nope"]) + def testFileEventPatternSweep(self)->None: + sweeps = { + 'first':{ + SWEEP_START: 0, + SWEEP_STOP: 3, + SWEEP_JUMP: 1 + }, + 'second':{ + SWEEP_START: 10, + SWEEP_STOP: 0, + SWEEP_JUMP: -2 + } + } + fep = FileEventPattern("name", "path", "recipe", "file", sweep=sweeps) + self.assertEqual(fep.sweep, sweeps) + + bad_sweep = { + 'first':{ + SWEEP_START: 0, + SWEEP_STOP: 3, + SWEEP_JUMP: -1 + }, + } + with self.assertRaises(ValueError): + fep = FileEventPattern("name", "path", "recipe", "file", + sweep=bad_sweep) + + bad_sweep = { + 'second':{ + SWEEP_START: 10, + SWEEP_STOP: 0, + SWEEP_JUMP: 1 + } + } + with self.assertRaises(ValueError): + fep = FileEventPattern("name", "path", "recipe", "file", + sweep=bad_sweep) + + def testWatchdogMonitorMinimum(self)->None: from_monitor = Pipe() WatchdogMonitor(TEST_MONITOR_BASE, {}, from_monitor[1])