From d3eb2dbf9fda34fb27995668a78b6197c1e4f9ac Mon Sep 17 00:00:00 2001 From: PatchOfScotland Date: Sat, 22 Apr 2023 21:48:33 +0200 Subject: [PATCH] added standardised job creation --- conductors/local_bash_conductor.py | 96 +----- conductors/local_python_conductor.py | 66 ---- core/base_conductor.py | 105 ++++++- core/base_handler.py | 134 +++++++- core/runner.py | 17 +- core/vars.py | 11 +- functionality/meow.py | 10 +- recipes/bash_recipe.py | 117 +------ recipes/jupyter_notebook_recipe.py | 189 ++---------- recipes/python_recipe.py | 217 ++----------- tests/test_base.py | 10 +- tests/test_conductors.py | 438 ++++++++------------------- tests/test_functionality.py | 10 +- tests/test_recipes.py | 326 +++----------------- tests/test_runner.py | 50 ++- 15 files changed, 515 insertions(+), 1281 deletions(-) diff --git a/conductors/local_bash_conductor.py b/conductors/local_bash_conductor.py index ae0e1b5..16b9d91 100644 --- a/conductors/local_bash_conductor.py +++ b/conductors/local_bash_conductor.py @@ -7,22 +7,16 @@ Author(s): David Marchant """ import os -import shutil -import subprocess -from datetime import datetime from typing import Any, Dict, Tuple from meow_base.core.base_conductor import BaseConductor from meow_base.core.meow import valid_job from meow_base.core.vars import DEFAULT_JOB_QUEUE_DIR, \ - DEFAULT_JOB_OUTPUT_DIR, JOB_TYPE, JOB_TYPE_BASH, META_FILE, JOB_STATUS, \ - BACKUP_JOB_ERROR_FILE, STATUS_DONE, JOB_END_TIME, STATUS_FAILED, \ - JOB_ERROR, JOB_TYPE, DEFAULT_JOB_QUEUE_DIR, STATUS_RUNNING, \ - JOB_START_TIME, DEFAULT_JOB_OUTPUT_DIR, get_job_file + DEFAULT_JOB_OUTPUT_DIR, JOB_TYPE, JOB_TYPE_BASH, JOB_TYPE, \ + DEFAULT_JOB_QUEUE_DIR, DEFAULT_JOB_OUTPUT_DIR from meow_base.functionality.validation import valid_dir_path -from meow_base.functionality.file_io import make_dir, write_file, \ - threadsafe_read_status, threadsafe_update_status +from meow_base.functionality.file_io import make_dir class LocalBashConductor(BaseConductor): @@ -54,90 +48,6 @@ class LocalBashConductor(BaseConductor): except Exception as e: return False, str(e) - def execute(self, job_dir:str)->None: - """Function to actually execute a Bash job. This will read job - defintions from its meta file, update the meta file and attempt to - execute. Some unspecific feedback will be given on execution failure, - but depending on what it is it may be up to the job itself to provide - more detailed feedback.""" - valid_dir_path(job_dir, must_exist=True) - - # Test our job parameters. Even if its gibberish, we still move to - # output - abort = False - try: - meta_file = os.path.join(job_dir, META_FILE) - job = threadsafe_read_status(meta_file) - valid_job(job) - - # update the status file with running status - threadsafe_update_status( - { - JOB_STATUS: STATUS_RUNNING, - JOB_START_TIME: datetime.now() - }, - meta_file - ) - - except Exception as e: - # If something has gone wrong at this stage then its bad, so we - # need to make our own error file - error_file = os.path.join(job_dir, BACKUP_JOB_ERROR_FILE) - write_file(f"Recieved incorrectly setup job.\n\n{e}", error_file) - abort = True - - # execute the job - if not abort: - try: - print(f"PWD: {os.getcwd()}") - print(f"job_dir: {job_dir}") - print(os.path.exists(os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)))) - result = subprocess.call( - os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)), - cwd="." - ) - - if result == 0: - # Update the status file with the finalised status - threadsafe_update_status( - { - JOB_STATUS: STATUS_DONE, - JOB_END_TIME: datetime.now() - }, - meta_file - ) - - else: - # Update the status file with the error status. Don't - # overwrite any more specific error messages already - # created - threadsafe_update_status( - { - JOB_STATUS: STATUS_FAILED, - JOB_END_TIME: datetime.now(), - JOB_ERROR: "Job execution returned non-zero." - }, - meta_file - ) - - except Exception as e: - # Update the status file with the error status. Don't overwrite - # any more specific error messages already created - threadsafe_update_status( - { - JOB_STATUS: STATUS_FAILED, - JOB_END_TIME: datetime.now(), - JOB_ERROR: f"Job execution failed. {e}" - }, - meta_file - ) - - # Move the contents of the execution directory to the final output - # directory. - job_output_dir = \ - os.path.join(self.job_output_dir, os.path.basename(job_dir)) - shutil.move(job_dir, job_output_dir) - def _is_valid_job_queue_dir(self, job_queue_dir)->None: """Validation check for 'job_queue_dir' variable from main constructor.""" diff --git a/conductors/local_python_conductor.py b/conductors/local_python_conductor.py index 13064ac..f248b94 100644 --- a/conductors/local_python_conductor.py +++ b/conductors/local_python_conductor.py @@ -51,72 +51,6 @@ class LocalPythonConductor(BaseConductor): except Exception as e: return False, str(e) - def execute(self, job_dir:str)->None: - """Function to actually execute a Python job. This will read job - defintions from its meta file, update the meta file and attempt to - execute. Some unspecific feedback will be given on execution failure, - but depending on what it is it may be up to the job itself to provide - more detailed feedback.""" - valid_dir_path(job_dir, must_exist=True) - - # Test our job parameters. Even if its gibberish, we still move to - # output - abort = False - try: - meta_file = os.path.join(job_dir, META_FILE) - job = threadsafe_read_status(meta_file) - valid_job(job) - - # update the status file with running status - threadsafe_update_status( - { - JOB_STATUS: STATUS_RUNNING, - JOB_START_TIME: datetime.now() - }, - meta_file - ) - - except Exception as e: - # If something has gone wrong at this stage then its bad, so we - # need to make our own error file - error_file = os.path.join(job_dir, BACKUP_JOB_ERROR_FILE) - write_file(f"Recieved incorrectly setup job.\n\n{e}", error_file) - abort = True - - # execute the job - if not abort: - try: - job_function = job[PYTHON_FUNC] - job_function(job_dir) - - # Update the status file with the finalised status - threadsafe_update_status( - { - JOB_STATUS: STATUS_DONE, - JOB_END_TIME: datetime.now() - }, - meta_file - ) - - except Exception as e: - # Update the status file with the error status. Don't overwrite - # any more specific error messages already created - threadsafe_update_status( - { - JOB_STATUS: STATUS_FAILED, - JOB_END_TIME: datetime.now(), - JOB_ERROR: f"Job execution failed. {e}" - }, - meta_file - ) - - - # Move the contents of the execution directory to the final output - # directory. - job_output_dir = \ - os.path.join(self.job_output_dir, os.path.basename(job_dir)) - shutil.move(job_dir, job_output_dir) - def _is_valid_job_queue_dir(self, job_queue_dir)->None: """Validation check for 'job_queue_dir' variable from main constructor.""" diff --git a/core/base_conductor.py b/core/base_conductor.py index f975387..5a318cb 100644 --- a/core/base_conductor.py +++ b/core/base_conductor.py @@ -5,15 +5,25 @@ from for all conductor instances. Author(s): David Marchant """ +import shutil +import subprocess +import os +from datetime import datetime from threading import Event, Thread from time import sleep from typing import Any, Tuple, Dict, Union + +from meow_base.core.meow import valid_job from meow_base.core.vars import VALID_CONDUCTOR_NAME_CHARS, VALID_CHANNELS, \ + JOB_STATUS, JOB_START_TIME, META_FILE, STATUS_RUNNING, STATUS_DONE , \ + BACKUP_JOB_ERROR_FILE, JOB_END_TIME, STATUS_FAILED, JOB_ERROR, \ get_drt_imp_msg +from meow_base.functionality.file_io import write_file, \ + threadsafe_read_status, threadsafe_update_status from meow_base.functionality.validation import check_implementation, \ - valid_string, valid_existing_dir_path, valid_natural + valid_string, valid_existing_dir_path, valid_natural, valid_dir_path from meow_base.functionality.naming import generate_conductor_id @@ -40,7 +50,6 @@ class BaseConductor: def __init__(self, name:str="", pause_time:int=5)->None: """BaseConductor Constructor. This will check that any class inheriting from it implements its validation functions.""" - check_implementation(type(self).execute, BaseConductor) check_implementation(type(self).valid_execute_criteria, BaseConductor) if not name: name = generate_conductor_id() @@ -129,7 +138,93 @@ class BaseConductor: process it or not. Must be implemented by any child process.""" pass + def run_job(self, job_dir:str)->None: + """Function to actually execute a job. This will read job + defintions from its meta file, update the meta file and attempt to + execute. Some unspecific feedback will be given on execution failure, + but depending on what it is it may be up to the job itself to provide + more detailed feedback. If you simply wish to alter the conditions + under which the job is executed, please instead look at the execute + function.""" + valid_dir_path(job_dir, must_exist=True) + + # Test our job parameters. Even if its gibberish, we still move to + # output + abort = False + try: + meta_file = os.path.join(job_dir, META_FILE) + job = threadsafe_read_status(meta_file) + valid_job(job) + + # update the status file with running status + threadsafe_update_status( + { + JOB_STATUS: STATUS_RUNNING, + JOB_START_TIME: datetime.now() + }, + meta_file + ) + + except Exception as e: + # If something has gone wrong at this stage then its bad, so we + # need to make our own error file + error_file = os.path.join(job_dir, BACKUP_JOB_ERROR_FILE) + write_file(f"Recieved incorrectly setup job.\n\n{e}", error_file) + abort = True + + # execute the job + if not abort: + try: + result = subprocess.call( + os.path.join(job_dir, job["tmp script command"]), + cwd="." + ) + + if result == 0: + # Update the status file with the finalised status + threadsafe_update_status( + { + JOB_STATUS: STATUS_DONE, + JOB_END_TIME: datetime.now() + }, + meta_file + ) + + else: + # Update the status file with the error status. Don't + # overwrite any more specific error messages already + # created + threadsafe_update_status( + { + JOB_STATUS: STATUS_FAILED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: "Job execution returned non-zero." + }, + meta_file + ) + + except Exception as e: + # Update the status file with the error status. Don't overwrite + # any more specific error messages already created + threadsafe_update_status( + { + JOB_STATUS: STATUS_FAILED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: f"Job execution failed. {e}" + }, + meta_file + ) + + # Move the contents of the execution directory to the final output + # directory. + job_output_dir = \ + os.path.join(self.job_output_dir, os.path.basename(job_dir)) + shutil.move(job_dir, job_output_dir) + def execute(self, job_dir:str)->None: - """Function to execute a given job directory. Must be implemented by - any child process.""" - pass + """Function to run job execution. By default this will simply call the + run_job function, to execute the job locally. However, this function + may be overridden to execute the job in some other manner, such as on + another resource. Note that the job itself should be executed using the + run_job func in order to maintain expected logging etc.""" + self.run_job(job_dir) \ No newline at end of file diff --git a/core/base_handler.py b/core/base_handler.py index 0c54d9b..31debef 100644 --- a/core/base_handler.py +++ b/core/base_handler.py @@ -6,16 +6,23 @@ from for all handler instances. Author(s): David Marchant """ +import os +import stat from threading import Event, Thread from typing import Any, Tuple, Dict, Union from time import sleep -from meow_base.core.vars import VALID_CHANNELS, \ - VALID_HANDLER_NAME_CHARS, get_drt_imp_msg +from meow_base.core.vars import VALID_CHANNELS, EVENT_RULE, EVENT_PATH, \ + VALID_HANDLER_NAME_CHARS, META_FILE, JOB_ID, WATCHDOG_BASE, JOB_FILE, \ + JOB_PARAMETERS, get_drt_imp_msg from meow_base.core.meow import valid_event +from meow_base.functionality.file_io import threadsafe_write_status, \ + threadsafe_update_status, make_dir, write_file, lines_to_string from meow_base.functionality.validation import check_implementation, \ valid_string, valid_natural +from meow_base.functionality.meow import create_job_metadata_dict, \ + replace_keywords from meow_base.functionality.naming import generate_handler_id class BaseHandler: @@ -42,8 +49,9 @@ class BaseHandler: def __init__(self, name:str='', pause_time:int=5)->None: """BaseHandler Constructor. This will check that any class inheriting from it implements its validation functions.""" - check_implementation(type(self).handle, BaseHandler) check_implementation(type(self).valid_handle_criteria, BaseHandler) + check_implementation(type(self).get_created_job_type, BaseHandler) + check_implementation(type(self).create_job_recipe_file, BaseHandler) if not name: name = generate_handler_id() self._is_valid_name(name) @@ -137,8 +145,124 @@ class BaseHandler: pass def handle(self, event:Dict[str,Any])->None: - """Function to handle a given event. Must be implemented by any child + """Function to handle a given event. May be overridden by any child process. Note that once any handling has occured, the send_job_to_runner function should be called to inform the runner of any resultant jobs.""" - pass + rule = event[EVENT_RULE] + + # Assemble job parameters dict from pattern variables + yaml_dict = {} + for var, val in rule.pattern.parameters.items(): + yaml_dict[var] = val + for var, val in rule.pattern.outputs.items(): + yaml_dict[var] = val + yaml_dict[rule.pattern.triggering_file] = event[EVENT_PATH] + + # If no parameter sweeps, then one job will suffice + if not rule.pattern.sweep: + self.setup_job(event, yaml_dict) + else: + # If parameter sweeps, then many jobs created + values_list = rule.pattern.expand_sweeps() + for values in values_list: + for value in values: + yaml_dict[value[0]] = value[1] + self.setup_job(event, yaml_dict) + + def setup_job(self, event:Dict[str,Any], params_dict:Dict[str,Any])->None: + """Function to set up new job dict and send it to the runner to be + executed.""" + + # Get base job metadata + meow_job = self.create_job_metadata_dict(event, params_dict) + + # Get updated job parameters + # TODO replace this with generic implementation + params_dict = replace_keywords( + params_dict, + meow_job[JOB_ID], + event[EVENT_PATH], + event[WATCHDOG_BASE] + ) + + # Create a base job directory + job_dir = os.path.join(self.job_queue_dir, meow_job[JOB_ID]) + make_dir(job_dir) + + # Create job metadata file + meta_file = self.create_job_meta_file(job_dir, meow_job) + + # Create job recipe file + recipe_command = self.create_job_recipe_file(job_dir, event, params_dict) + + # Create job script file + script_command = self.create_job_script_file(job_dir, recipe_command) + + threadsafe_update_status( + { + # TODO make me not tmp variables and update job dict validation + "tmp recipe command": recipe_command, + "tmp script command": script_command + }, + meta_file + ) + + # Send job directory, as actual definitons will be read from within it + self.send_job_to_runner(job_dir) + + def get_created_job_type(self)->str: + pass # Must implemented + + def create_job_metadata_dict(self, event:Dict[str,Any], + params_dict:Dict[str,Any])->Dict[str,Any]: + return create_job_metadata_dict( + self.get_created_job_type(), + event, + extras={ + JOB_PARAMETERS:params_dict + } + ) + + def create_job_meta_file(self, job_dir:str, meow_job:Dict[str,Any] + )->Dict[str,Any]: + meta_file = os.path.join(job_dir, META_FILE) + + threadsafe_write_status(meow_job, meta_file) + + return meta_file + + def create_job_recipe_file(self, job_dir:str, event:Dict[str,Any], params_dict:Dict[str,Any] + )->str: + pass # Must implemented + + def create_job_script_file(self, job_dir:str, recipe_command:str)->str: + # TODO Make this more generic, so only checking hashes if that is present + job_script = [ + "#!/bin/bash", + "", + "# Get job params", + "given_hash=$(grep 'file_hash: *' $(dirname $0)/job.yml | tail -n1 | cut -c 14-)", + "event_path=$(grep 'event_path: *' $(dirname $0)/job.yml | tail -n1 | cut -c 15-)", + "", + "echo event_path: $event_path", + "echo given_hash: $given_hash", + "", + "# Check hash of input file to avoid race conditions", + "actual_hash=$(sha256sum $event_path | cut -c -64)", + "echo actual_hash: $actual_hash", + "if [ $given_hash != $actual_hash ]; then", + " echo Job was skipped as triggering file has been modified since scheduling", + " exit 134", + "fi", + "", + "# Call actual job script", + recipe_command, + "", + "exit $?" + ] + job_file = os.path.join(job_dir, JOB_FILE) + write_file(lines_to_string(job_script), job_file) + os.chmod(job_file, stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ) + + return os.path.join(".", JOB_FILE) \ No newline at end of file diff --git a/core/runner.py b/core/runner.py index 3f301bb..9857823 100644 --- a/core/runner.py +++ b/core/runner.py @@ -17,11 +17,13 @@ from meow_base.core.base_conductor import BaseConductor from meow_base.core.base_handler import BaseHandler from meow_base.core.base_monitor import BaseMonitor from meow_base.core.vars import DEBUG_WARNING, DEBUG_INFO, \ - VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, DEFAULT_JOB_QUEUE_DIR + VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, DEFAULT_JOB_QUEUE_DIR, \ + JOB_STATUS, STATUS_QUEUED from meow_base.functionality.validation import check_type, valid_list, \ valid_dir_path, check_implementation from meow_base.functionality.debug import setup_debugging, print_debug -from meow_base.functionality.file_io import make_dir, threadsafe_read_status +from meow_base.functionality.file_io import make_dir, threadsafe_read_status, \ + threadsafe_update_status from meow_base.functionality.process_io import wait @@ -183,13 +185,20 @@ class MeowRunner: message = connection.recv() - # Recieved an event + # Recieved a job if isinstance(component, BaseHandler): self.job_queue.append(message) + threadsafe_update_status( + { + JOB_STATUS: STATUS_QUEUED + }, + os.path.join(message, META_FILE) + ) continue - # Recieved a request for an event + # Recieved a request for a job if isinstance(component, BaseConductor): valid = False + print(f"Got request for job") for job_dir in self.job_queue: try: metafile = os.path.join(job_dir, META_FILE) diff --git a/core/vars.py b/core/vars.py index ac5cc56..a8fdfa2 100644 --- a/core/vars.py +++ b/core/vars.py @@ -85,6 +85,7 @@ DEFAULT_JOB_QUEUE_DIR = "job_queue" DEFAULT_JOB_OUTPUT_DIR = "job_output" # meow jobs +JOB_FILE = "job.sh" JOB_TYPE = "job_type" JOB_TYPE_BASH = "bash" JOB_TYPE_PYTHON = "python" @@ -125,6 +126,7 @@ JOB_REQUIREMENTS = "requirements" JOB_PARAMETERS = "parameters" # job statuses +STATUS_CREATING = "creating" STATUS_QUEUED = "queued" STATUS_RUNNING = "running" STATUS_SKIPPED = "skipped" @@ -157,12 +159,3 @@ def get_not_imp_msg(parent_class, class_function): return f"Children of the '{parent_class.__name__}' class must implement " \ f"the '{class_function.__name__}({signature(class_function)})' " \ "function" - -def get_base_file(job_type:str): - return JOB_TYPES[job_type][0] - -def get_job_file(job_type:str): - return JOB_TYPES[job_type][1] - -def get_result_file(job_type:str): - return JOB_TYPES[job_type][2] diff --git a/functionality/meow.py b/functionality/meow.py index 556aca5..035d73c 100644 --- a/functionality/meow.py +++ b/functionality/meow.py @@ -16,7 +16,7 @@ from meow_base.functionality.validation import check_type, valid_dict, \ from meow_base.core.vars import EVENT_PATH, EVENT_RULE, \ EVENT_TYPE, EVENT_TYPE_WATCHDOG, JOB_CREATE_TIME, JOB_EVENT, JOB_ID, \ JOB_PATTERN, JOB_RECIPE, JOB_REQUIREMENTS, JOB_RULE, JOB_STATUS, \ - JOB_TYPE, STATUS_QUEUED, WATCHDOG_BASE, WATCHDOG_HASH, SWEEP_JUMP, \ + JOB_TYPE, STATUS_CREATING, WATCHDOG_BASE, WATCHDOG_HASH, SWEEP_JUMP, \ SWEEP_START, SWEEP_STOP from meow_base.functionality.naming import generate_job_id @@ -32,6 +32,8 @@ KEYWORD_EXTENSION = "{EXTENSION}" KEYWORD_JOB = "{JOB}" +# TODO make this generic for all event types, currently very tied to file +# events def replace_keywords(old_dict:Dict[str,str], job_id:str, src_path:str, monitor_base:str)->Dict[str,str]: """Function to replace all MEOW magic words in a dictionary with dynamic @@ -128,8 +130,8 @@ def create_watchdog_event(path:str, rule:Any, base:str, hash:str, } ) -def create_job(job_type:str, event:Dict[str,Any], extras:Dict[Any,Any]={} - )->Dict[Any,Any]: +def create_job_metadata_dict(job_type:str, event:Dict[str,Any], + extras:Dict[Any,Any]={})->Dict[Any,Any]: """Function to create a MEOW job dictionary.""" job_dict = { #TODO compress event? @@ -139,7 +141,7 @@ def create_job(job_type:str, event:Dict[str,Any], extras:Dict[Any,Any]={} JOB_PATTERN: event[EVENT_RULE].pattern.name, JOB_RECIPE: event[EVENT_RULE].recipe.name, JOB_RULE: event[EVENT_RULE].name, - JOB_STATUS: STATUS_QUEUED, + JOB_STATUS: STATUS_CREATING, JOB_CREATE_TIME: datetime.now(), JOB_REQUIREMENTS: event[EVENT_RULE].recipe.requirements } diff --git a/recipes/bash_recipe.py b/recipes/bash_recipe.py index 68bdeba..715f0fd 100644 --- a/recipes/bash_recipe.py +++ b/recipes/bash_recipe.py @@ -11,15 +11,12 @@ from meow_base.core.meow import valid_event from meow_base.functionality.validation import check_type, valid_dict, \ valid_string, valid_dir_path from meow_base.core.vars import DEBUG_INFO, DEFAULT_JOB_QUEUE_DIR, \ - VALID_VARIABLE_NAME_CHARS, EVENT_PATH, EVENT_RULE, EVENT_TYPE, JOB_ID, \ - EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, JOB_PARAMETERS, WATCHDOG_BASE, \ - META_FILE, STATUS_QUEUED, JOB_STATUS, \ - get_base_file, get_job_file + VALID_VARIABLE_NAME_CHARS, EVENT_RULE, EVENT_TYPE, EVENT_TYPE_WATCHDOG, \ + JOB_TYPE_BASH from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.file_io import valid_path, make_dir, write_file, \ - lines_to_string, threadsafe_write_status + lines_to_string from meow_base.functionality.parameterisation import parameterize_bash_script -from meow_base.functionality.meow import create_job, replace_keywords class BashRecipe(BaseRecipe): @@ -78,32 +75,6 @@ class BashHandler(BaseHandler): print_debug(self._print_target, self.debug_level, "Created new BashHandler instance", DEBUG_INFO) - def handle(self, event:Dict[str,Any])->None: - """Function called to handle a given event.""" - print_debug(self._print_target, self.debug_level, - f"Handling event {event[EVENT_PATH]}", DEBUG_INFO) - - rule = event[EVENT_RULE] - - # Assemble job parameters dict from pattern variables - yaml_dict = {} - for var, val in rule.pattern.parameters.items(): - yaml_dict[var] = val - for var, val in rule.pattern.outputs.items(): - yaml_dict[var] = val - yaml_dict[rule.pattern.triggering_file] = event[EVENT_PATH] - - # If no parameter sweeps, then one job will suffice - if not rule.pattern.sweep: - self.setup_job(event, yaml_dict) - else: - # If parameter sweeps, then many jobs created - values_list = rule.pattern.expand_sweeps() - for values in values_list: - for value in values: - yaml_dict[value[0]] = value[1] - self.setup_job(event, yaml_dict) - def valid_handle_criteria(self, event:Dict[str,Any])->Tuple[bool,str]: """Function to determine given an event defintion, if this handler can process it or not. This handler accepts events from watchdog with @@ -129,81 +100,17 @@ class BashHandler(BaseHandler): if not os.path.exists(job_queue_dir): make_dir(job_queue_dir) - def setup_job(self, event:Dict[str,Any], yaml_dict:Dict[str,Any])->None: - """Function to set up new job dict and send it to the runner to be - executed.""" - meow_job = create_job( - JOB_TYPE_BASH, - event, - extras={ - JOB_PARAMETERS:yaml_dict - } - ) - print_debug(self._print_target, self.debug_level, - f"Creating job from event at {event[EVENT_PATH]} of type " - f"{JOB_TYPE_BASH}.", DEBUG_INFO) - - # replace MEOW keyworks within variables dict - yaml_dict = replace_keywords( - meow_job[JOB_PARAMETERS], - meow_job[JOB_ID], - event[EVENT_PATH], - event[WATCHDOG_BASE] - ) - - # Create a base job directory - job_dir = os.path.join(self.job_queue_dir, meow_job[JOB_ID]) - make_dir(job_dir) - - # write a status file to the job directory - meta_file = os.path.join(job_dir, META_FILE) - threadsafe_write_status(meow_job, meta_file) - + def get_created_job_type(self)->str: + return JOB_TYPE_BASH + + def create_job_recipe_file(self, job_dir:str, event:Dict[str,Any], + params_dict:Dict[str,Any])->str: # parameterise recipe and write as executeable script base_script = parameterize_bash_script( - event[EVENT_RULE].recipe.recipe, yaml_dict + event[EVENT_RULE].recipe.recipe, params_dict ) - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_BASH)) + base_file = os.path.join(job_dir, "recipe.sh") write_file(lines_to_string(base_script), base_file) - os.chmod(base_file, stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + os.chmod(base_file, stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ) - # Write job script, to manage base script lifetime and execution - - job_script = assemble_bash_job_script() - job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)) - write_file(lines_to_string(job_script), job_file) - os.chmod(job_file, stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) - - meow_job[JOB_STATUS] = STATUS_QUEUED - - # update the status file with queued status - threadsafe_write_status(meow_job, meta_file) - - # Send job directory, as actual definitons will be read from within it - self.send_job_to_runner(job_dir) - - -def assemble_bash_job_script()->List[str]: - return [ - "#!/bin/bash", - "", - "# Get job params", - "given_hash=$(grep 'file_hash: *' $(dirname $0)/job.yml | tail -n1 | cut -c 14-)", - "event_path=$(grep 'event_path: *' $(dirname $0)/job.yml | tail -n1 | cut -c 15-)", - "", - "echo event_path: $event_path", - "echo given_hash: $given_hash", - "", - "# Check hash of input file to avoid race conditions", - "actual_hash=$(sha256sum $event_path | cut -c -64)", - "echo actual_hash: $actual_hash", - "if [ $given_hash != $actual_hash ]; then", - " echo Job was skipped as triggering file has been modified since scheduling", - " exit 134", - "fi", - "", - "# Call actual job script", - "$(dirname $0)/base.sh", - "", - "exit $?" - ] + return os.path.join("$(dirname $0)", "recipe.sh") diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index 5d53f44..42be6ae 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -8,6 +8,7 @@ Author(s): David Marchant import os import nbformat import sys +import stat from typing import Any, Tuple, Dict @@ -17,17 +18,13 @@ from meow_base.core.meow import valid_event from meow_base.functionality.validation import check_type, valid_string, \ valid_dict, valid_path, valid_dir_path, valid_existing_file_path from meow_base.core.vars import VALID_VARIABLE_NAME_CHARS, \ - PYTHON_FUNC, DEBUG_INFO, EVENT_TYPE_WATCHDOG, \ - DEFAULT_JOB_QUEUE_DIR, EVENT_PATH, JOB_TYPE_PAPERMILL, WATCHDOG_HASH, \ - JOB_PARAMETERS, JOB_ID, WATCHDOG_BASE, META_FILE, PARAMS_FILE, \ - JOB_STATUS, STATUS_QUEUED, EVENT_RULE, EVENT_TYPE, EVENT_RULE, \ - get_base_file + DEBUG_INFO, EVENT_TYPE_WATCHDOG, DEFAULT_JOB_QUEUE_DIR, \ + JOB_TYPE_PAPERMILL, EVENT_RULE, EVENT_TYPE, EVENT_RULE from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.file_io import make_dir, read_notebook, \ - write_notebook, write_yaml, threadsafe_write_status, \ - threadsafe_update_status -from meow_base.functionality.meow import create_job, replace_keywords - + write_notebook +from meow_base.functionality.parameterisation import \ + parameterize_jupyter_notebook class JupyterNotebookRecipe(BaseRecipe): # A path to the jupyter notebook used to create this recipe @@ -84,32 +81,6 @@ class PapermillHandler(BaseHandler): print_debug(self._print_target, self.debug_level, "Created new PapermillHandler instance", DEBUG_INFO) - def handle(self, event:Dict[str,Any])->None: - """Function called to handle a given event.""" - print_debug(self._print_target, self.debug_level, - f"Handling event {event[EVENT_PATH]}", DEBUG_INFO) - - rule = event[EVENT_RULE] - - # Assemble job parameters dict from pattern variables - yaml_dict = {} - for var, val in rule.pattern.parameters.items(): - yaml_dict[var] = val - for var, val in rule.pattern.outputs.items(): - yaml_dict[var] = val - yaml_dict[rule.pattern.triggering_file] = event[EVENT_PATH] - - # If no parameter sweeps, then one job will suffice - if not rule.pattern.sweep: - self.setup_job(event, yaml_dict) - else: - # If parameter sweeps, then many jobs created - values_list = rule.pattern.expand_sweeps() - for values in values_list: - for value in values: - yaml_dict[value[0]] = value[1] - self.setup_job(event, yaml_dict) - def valid_handle_criteria(self, event:Dict[str,Any])->Tuple[bool,str]: """Function to determine given an event defintion, if this handler can process it or not. This handler accepts events from watchdog with @@ -135,57 +106,22 @@ class PapermillHandler(BaseHandler): if not os.path.exists(job_queue_dir): make_dir(job_queue_dir) - def setup_job(self, event:Dict[str,Any], yaml_dict:Dict[str,Any])->None: - """Function to set up new job dict and send it to the runner to be - executed.""" - meow_job = create_job( - JOB_TYPE_PAPERMILL, - event, - extras={ - JOB_PARAMETERS:yaml_dict, - PYTHON_FUNC:papermill_job_func, - } + def get_created_job_type(self)->str: + return JOB_TYPE_PAPERMILL + + def create_job_recipe_file(self, job_dir:str, event:Dict[str,Any], + params_dict:Dict[str,Any])->str: + # parameterise recipe and write as executeable script + base_script = parameterize_jupyter_notebook( + event[EVENT_RULE].recipe.recipe, params_dict ) - print_debug(self._print_target, self.debug_level, - f"Creating job from event at {event[EVENT_PATH]} of type " - f"{JOB_TYPE_PAPERMILL}.", DEBUG_INFO) + base_file = os.path.join(job_dir, "recipe.ipynb") - # replace MEOW keyworks within variables dict - yaml_dict = replace_keywords( - meow_job[JOB_PARAMETERS], - meow_job[JOB_ID], - event[EVENT_PATH], - event[WATCHDOG_BASE] - ) + write_notebook(base_script, base_file) - # Create a base job directory - job_dir = os.path.join(self.job_queue_dir, meow_job[JOB_ID]) - make_dir(job_dir) + os.chmod(base_file, stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ) - # write a status file to the job directory - meta_file = os.path.join(job_dir, META_FILE) - threadsafe_write_status(meow_job, meta_file) - - # write an executable notebook to the job directory - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL)) - write_notebook(event[EVENT_RULE].recipe.recipe, base_file) - - # write a parameter file to the job directory - param_file = os.path.join(job_dir, PARAMS_FILE) - write_yaml(yaml_dict, param_file) - - meow_job[JOB_STATUS] = STATUS_QUEUED - - # update the status file with queued status - threadsafe_update_status( - { - JOB_STATUS: STATUS_QUEUED - }, - meta_file - ) - - # Send job directory, as actual definitons will be read from within it - self.send_job_to_runner(job_dir) + return f"papermill {base_file} {os.path.join(job_dir, 'result.ipynb')}" def get_recipe_from_notebook(name:str, notebook_filename:str, parameters:Dict[str,Any]={}, requirements:Dict[str,Any]={} @@ -202,92 +138,3 @@ def get_recipe_from_notebook(name:str, notebook_filename:str, requirements=requirements, source=notebook_filename ) - -# Papermill job execution code, to be run within the conductor -def papermill_job_func(job_dir): - # Requires own imports as will be run in its own execution environment - import os - import papermill - from datetime import datetime - from meow_base.core.vars import JOB_EVENT, JOB_ID, \ - EVENT_PATH, META_FILE, PARAMS_FILE, \ - JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ - JOB_ERROR, STATUS_FAILED, get_job_file, \ - get_result_file - from meow_base.functionality.file_io import read_yaml, write_notebook, \ - threadsafe_read_status, threadsafe_update_status - from meow_base.functionality.hashing import get_hash - from meow_base.functionality.parameterisation import parameterize_jupyter_notebook - - - # Identify job files - meta_file = os.path.join(job_dir, META_FILE) - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL)) - job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_PAPERMILL)) - result_file = os.path.join(job_dir, get_result_file(JOB_TYPE_PAPERMILL)) - param_file = os.path.join(job_dir, PARAMS_FILE) - - # Get job defintions - job = threadsafe_read_status(meta_file) - yaml_dict = read_yaml(param_file) - - # Check the hash of the triggering file, if present. This addresses - # potential race condition as file could have been modified since - # triggering event - if JOB_EVENT in job and WATCHDOG_HASH in job[JOB_EVENT]: - # get current hash - triggerfile_hash = get_hash(job[JOB_EVENT][EVENT_PATH], SHA256) - # If hash doesn't match, then abort the job. If its been modified, then - # another job will have been scheduled anyway. - if not triggerfile_hash \ - or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]: - msg = "Job was skipped as triggering file " + \ - f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \ - "scheduling. Was expected to have hash " + \ - f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \ - f"{triggerfile_hash}'." - threadsafe_update_status( - { - JOB_STATUS: STATUS_SKIPPED, - JOB_END_TIME: datetime.now(), - JOB_ERROR: msg - }, - meta_file - ) - - return - - # Create a parameterised version of the executable notebook - try: - base_notebook = read_notebook(base_file) - job_notebook = parameterize_jupyter_notebook( - base_notebook, yaml_dict - ) - write_notebook(job_notebook, job_file) - except Exception as e: - msg = f"Job file {job[JOB_ID]} was not created successfully. {e}" - threadsafe_update_status( - { - JOB_STATUS: STATUS_FAILED, - JOB_END_TIME: datetime.now(), - JOB_ERROR: msg - }, - meta_file - ) - return - - # Execute the parameterised notebook - try: - papermill.execute_notebook(job_file, result_file, {}) - except Exception as e: - msg = f"Result file {result_file} was not created successfully. {e}" - threadsafe_update_status( - { - JOB_STATUS: STATUS_FAILED, - JOB_END_TIME: datetime.now(), - JOB_ERROR: msg - }, - meta_file - ) - - return diff --git a/recipes/python_recipe.py b/recipes/python_recipe.py index 4db1dbf..b31ecc8 100644 --- a/recipes/python_recipe.py +++ b/recipes/python_recipe.py @@ -6,6 +6,7 @@ along with an appropriate handler for said events. Author(s): David Marchant """ import os +import stat import sys from typing import Any, Tuple, Dict, List @@ -16,16 +17,12 @@ from meow_base.core.meow import valid_event from meow_base.functionality.validation import check_script, valid_string, \ valid_dict, valid_dir_path from meow_base.core.vars import VALID_VARIABLE_NAME_CHARS, \ - PYTHON_FUNC, DEBUG_INFO, EVENT_TYPE_WATCHDOG, \ - DEFAULT_JOB_QUEUE_DIR, EVENT_RULE, EVENT_PATH, JOB_TYPE_PYTHON, \ - WATCHDOG_HASH, JOB_PARAMETERS, JOB_ID, WATCHDOG_BASE, META_FILE, \ - PARAMS_FILE, JOB_STATUS, STATUS_QUEUED, EVENT_TYPE, EVENT_RULE, \ - get_base_file + DEBUG_INFO, EVENT_TYPE_WATCHDOG, DEFAULT_JOB_QUEUE_DIR, EVENT_RULE, \ + JOB_TYPE_PYTHON, EVENT_TYPE, EVENT_RULE from meow_base.functionality.debug import setup_debugging, print_debug -from meow_base.functionality.file_io import make_dir, read_file_lines, \ - write_file, write_yaml, lines_to_string, threadsafe_write_status, \ - threadsafe_update_status -from meow_base.functionality.meow import create_job, replace_keywords +from meow_base.functionality.file_io import make_dir, write_file, \ + lines_to_string +from meow_base.functionality.parameterisation import parameterize_python_script class PythonRecipe(BaseRecipe): @@ -74,31 +71,12 @@ class PythonHandler(BaseHandler): print_debug(self._print_target, self.debug_level, "Created new PythonHandler instance", DEBUG_INFO) - def handle(self, event:Dict[str,Any])->None: - """Function called to handle a given event.""" - print_debug(self._print_target, self.debug_level, - f"Handling event {event[EVENT_PATH]}", DEBUG_INFO) - - rule = event[EVENT_RULE] - - # Assemble job parameters dict from pattern variables - yaml_dict = {} - for var, val in rule.pattern.parameters.items(): - yaml_dict[var] = val - for var, val in rule.pattern.outputs.items(): - yaml_dict[var] = val - yaml_dict[rule.pattern.triggering_file] = event[EVENT_PATH] - - # If no parameter sweeps, then one job will suffice - if not rule.pattern.sweep: - self.setup_job(event, yaml_dict) - else: - # If parameter sweeps, then many jobs created - values_list = rule.pattern.expand_sweeps() - for values in values_list: - for value in values: - yaml_dict[value[0]] = value[1] - self.setup_job(event, yaml_dict) + def _is_valid_job_queue_dir(self, job_queue_dir)->None: + """Validation check for 'job_queue_dir' variable from main + constructor.""" + valid_dir_path(job_queue_dir, must_exist=False) + if not os.path.exists(job_queue_dir): + make_dir(job_queue_dir) def valid_handle_criteria(self, event:Dict[str,Any])->Tuple[bool,str]: """Function to determine given an event defintion, if this handler can @@ -118,168 +96,19 @@ class PythonHandler(BaseHandler): except Exception as e: return False, str(e) - def _is_valid_job_queue_dir(self, job_queue_dir)->None: - """Validation check for 'job_queue_dir' variable from main - constructor.""" - valid_dir_path(job_queue_dir, must_exist=False) - if not os.path.exists(job_queue_dir): - make_dir(job_queue_dir) + def get_created_job_type(self)->str: + return JOB_TYPE_PYTHON - def setup_job(self, event:Dict[str,Any], yaml_dict:Dict[str,Any])->None: - """Function to set up new job dict and send it to the runner to be - executed.""" - meow_job = create_job( - JOB_TYPE_PYTHON, - event, - extras={ - JOB_PARAMETERS:yaml_dict, - PYTHON_FUNC:python_job_func - } + def create_job_recipe_file(self, job_dir:str, event:Dict[str,Any], + params_dict: Dict[str,Any])->str: + # parameterise recipe and write as executeable script + base_script = parameterize_python_script( + event[EVENT_RULE].recipe.recipe, params_dict ) - print_debug(self._print_target, self.debug_level, - f"Creating job from event at {event[EVENT_PATH]} of type " - f"{JOB_TYPE_PYTHON}.", DEBUG_INFO) + base_file = os.path.join(job_dir, "recipe.py") - # replace MEOW keyworks within variables dict - yaml_dict = replace_keywords( - meow_job[JOB_PARAMETERS], - meow_job[JOB_ID], - event[EVENT_PATH], - event[WATCHDOG_BASE] - ) + write_file(lines_to_string(base_script), base_file) + os.chmod(base_file, stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ) - # Create a base job directory - job_dir = os.path.join(self.job_queue_dir, meow_job[JOB_ID]) - make_dir(job_dir) + return f"python3 {base_file} >>{os.path.join(job_dir, 'output.log')} 2>&1" - # write a status file to the job directory - meta_file = os.path.join(job_dir, META_FILE) - threadsafe_write_status(meow_job, meta_file) - - # write an executable script to the job directory - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PYTHON)) - write_file(lines_to_string(event[EVENT_RULE].recipe.recipe), base_file) - - # write a parameter file to the job directory - param_file = os.path.join(job_dir, PARAMS_FILE) - write_yaml(yaml_dict, param_file) - - # update the status file with queued status - threadsafe_update_status( - { - JOB_STATUS: STATUS_QUEUED - }, - meta_file - ) - - # Send job directory, as actual definitons will be read from within it - self.send_job_to_runner(job_dir) - - -# Papermill job execution code, to be run within the conductor -def python_job_func(job_dir): - # Requires own imports as will be run in its own execution environment - import sys - import os - from datetime import datetime - from io import StringIO - from meow_base.core.vars import JOB_EVENT, JOB_ID, \ - EVENT_PATH, META_FILE, PARAMS_FILE, \ - JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ - JOB_ERROR, STATUS_FAILED, get_base_file, \ - get_job_file, get_result_file - from meow_base.functionality.file_io import read_yaml, \ - threadsafe_read_status, threadsafe_update_status - from meow_base.functionality.hashing import get_hash - from meow_base.functionality.parameterisation import parameterize_python_script - - # Identify job files - meta_file = os.path.join(job_dir, META_FILE) - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PYTHON)) - job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_PYTHON)) - result_file = os.path.join(job_dir, get_result_file(JOB_TYPE_PYTHON)) - param_file = os.path.join(job_dir, PARAMS_FILE) - - # Get job defintions - job = threadsafe_read_status(meta_file) - yaml_dict = read_yaml(param_file) - - # Check the hash of the triggering file, if present. This addresses - # potential race condition as file could have been modified since - # triggering event - if JOB_EVENT in job and WATCHDOG_HASH in job[JOB_EVENT]: - # get current hash - triggerfile_hash = get_hash(job[JOB_EVENT][EVENT_PATH], SHA256) - # If hash doesn't match, then abort the job. If its been modified, then - # another job will have been scheduled anyway. - if not triggerfile_hash \ - or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]: - msg = "Job was skipped as triggering file " + \ - f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \ - "scheduling. Was expected to have hash " + \ - f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \ - f"{triggerfile_hash}'." - threadsafe_update_status( - { - JOB_STATUS: STATUS_SKIPPED, - JOB_END_TIME: datetime.now(), - JOB_ERROR: msg - }, - meta_file - ) - return - - # Create a parameterised version of the executable script - try: - base_script = read_file_lines(base_file) - job_script = parameterize_python_script( - base_script, yaml_dict - ) - write_file(lines_to_string(job_script), job_file) - except Exception as e: - msg = f"Job file {job[JOB_ID]} was not created successfully. {e}" - threadsafe_update_status( - { - JOB_STATUS: STATUS_FAILED, - JOB_END_TIME: datetime.now(), - JOB_ERROR: msg - }, - meta_file - ) - return - - # Execute the parameterised script - std_stdout = sys.stdout - std_stderr = sys.stderr - try: - redirected_output = sys.stdout = StringIO() - redirected_error = sys.stderr = StringIO() - - exec(open(job_file).read()) - - write_file(("--STDOUT--\n" - f"{redirected_output.getvalue()}\n" - "\n" - "--STDERR--\n" - f"{redirected_error.getvalue()}\n" - ""), - result_file) - - except Exception as e: - sys.stdout = std_stdout - sys.stderr = std_stderr - - msg = f"Result file {result_file} was not created successfully. {e}" - threadsafe_update_status( - { - JOB_STATUS: STATUS_FAILED, - JOB_END_TIME: datetime.now(), - JOB_ERROR: msg - }, - meta_file - ) - - return - - sys.stdout = std_stdout - sys.stderr = std_stderr diff --git a/tests/test_base.py b/tests/test_base.py index e2304cd..19a46da 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -202,11 +202,14 @@ class BaseHandleTests(unittest.TestCase): TestHandler() class FullTestHandler(BaseHandler): - def handle(self, event): - pass def valid_handle_criteria(self, event:Dict[str,Any] )->Tuple[bool,str]: pass + def get_created_job_type(self)->str: + pass + def create_job_recipe_file(self, job_dir:str, event:Dict[str,Any], + params_dict:Dict[str,Any])->str: + pass FullTestHandler() @@ -233,9 +236,6 @@ class BaseConductorTests(unittest.TestCase): TestConductor() class FullTestConductor(BaseConductor): - def execute(self, job_dir:str)->None: - pass - def valid_execute_criteria(self, job:Dict[str,Any] )->Tuple[bool,str]: pass diff --git a/tests/test_conductors.py b/tests/test_conductors.py index d45d6f5..b956f0e 100644 --- a/tests/test_conductors.py +++ b/tests/test_conductors.py @@ -4,31 +4,31 @@ import stat import unittest from datetime import datetime +from multiprocessing import Pipe from typing import Dict from meow_base.core.vars import JOB_TYPE_PYTHON, SHA256, \ JOB_PARAMETERS, PYTHON_FUNC, JOB_ID, BACKUP_JOB_ERROR_FILE, \ - JOB_EVENT, META_FILE, PARAMS_FILE, JOB_STATUS, JOB_ERROR, JOB_TYPE, \ + JOB_EVENT, META_FILE, JOB_STATUS, JOB_ERROR, JOB_TYPE, \ JOB_PATTERN, STATUS_DONE, JOB_TYPE_PAPERMILL, JOB_RECIPE, JOB_RULE, \ JOB_CREATE_TIME, JOB_REQUIREMENTS, EVENT_PATH, EVENT_RULE, EVENT_TYPE, \ - EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, \ - get_base_file, get_result_file, get_job_file + EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, JOB_FILE from meow_base.conductors import LocalPythonConductor, LocalBashConductor from meow_base.functionality.file_io import read_file, read_yaml, write_file, \ - write_notebook, write_yaml, lines_to_string, make_dir + write_yaml, lines_to_string, make_dir, threadsafe_read_status from meow_base.functionality.hashing import get_hash -from meow_base.functionality.meow import create_watchdog_event, create_job, \ +from meow_base.functionality.meow import create_watchdog_event, create_job_metadata_dict, \ create_rule from meow_base.functionality.parameterisation import parameterize_bash_script from meow_base.patterns.file_event_pattern import FileEventPattern from meow_base.recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ - papermill_job_func -from meow_base.recipes.python_recipe import PythonRecipe, python_job_func -from meow_base.recipes.bash_recipe import BashRecipe, assemble_bash_job_script + PapermillHandler +from meow_base.recipes.python_recipe import PythonRecipe, PythonHandler +from meow_base.recipes.bash_recipe import BashRecipe, BashHandler from shared import TEST_MONITOR_BASE, APPENDING_NOTEBOOK, TEST_JOB_OUTPUT, \ TEST_JOB_QUEUE, COMPLETE_PYTHON_SCRIPT, BAREBONES_PYTHON_SCRIPT, \ BAREBONES_NOTEBOOK, COMPLETE_BASH_SCRIPT, BAREBONES_BASH_SCRIPT, \ - setup, teardown + setup, teardown, count_non_locks def failing_func(): raise Exception("bad function") @@ -58,10 +58,16 @@ class PythonTests(unittest.TestCase): # Test LocalPythonConductor executes valid python jobs def testLocalPythonConductorValidPythonJob(self)->None: + from_handler_to_runner_reader, from_handler_to_runner_writer = Pipe() + bh = PythonHandler(job_queue_dir=TEST_JOB_QUEUE) + bh.to_runner_job = from_handler_to_runner_writer + + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) lpc = LocalPythonConductor( job_queue_dir=TEST_JOB_QUEUE, job_output_dir=TEST_JOB_OUTPUT ) + lpc.to_runner_job = conductor_to_test_conductor file_path = os.path.join(TEST_MONITOR_BASE, "test") result_path = os.path.join(TEST_MONITOR_BASE, "output") @@ -91,37 +97,34 @@ class PythonTests(unittest.TestCase): "outfile":result_path } - job_dict = create_job( - JOB_TYPE_PYTHON, - create_watchdog_event( - file_path, - rule, - TEST_MONITOR_BASE, - file_hash - ), - extras={ - JOB_PARAMETERS:params_dict, - PYTHON_FUNC:python_job_func - } + event = create_watchdog_event( + file_path, + rule, + TEST_MONITOR_BASE, + file_hash ) - job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) - make_dir(job_dir) + bh.setup_job(event, params_dict) - param_file = os.path.join(job_dir, PARAMS_FILE) - write_yaml(params_dict, param_file) + lpc.start() - meta_path = os.path.join(job_dir, META_FILE) - write_yaml(job_dict, meta_path) + # Get valid job + if from_handler_to_runner_reader.poll(3): + job_queue_dir = from_handler_to_runner_reader.recv() - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PYTHON)) - write_file(lines_to_string(COMPLETE_PYTHON_SCRIPT), base_file) + # Send it to conductor + if conductor_to_test_test.poll(3): + _ = conductor_to_test_test.recv() + conductor_to_test_test.send(job_queue_dir) - lpc.execute(job_dir) + # Wait for job to complete + if conductor_to_test_test.poll(3): + _ = conductor_to_test_test.recv() + conductor_to_test_test.send(1) - self.assertFalse(os.path.exists(job_dir)) - - job_output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + job_output_dir = job_queue_dir.replace(TEST_JOB_QUEUE, TEST_JOB_OUTPUT) + + self.assertFalse(os.path.exists(job_queue_dir)) self.assertTrue(os.path.exists(job_output_dir)) meta_path = os.path.join(job_output_dir, META_FILE) @@ -129,26 +132,33 @@ class PythonTests(unittest.TestCase): status = read_yaml(meta_path) self.assertIsInstance(status, Dict) self.assertIn(JOB_STATUS, status) - self.assertEqual(status[JOB_STATUS], STATUS_DONE) + + print(status) + self.assertEqual(status[JOB_STATUS], STATUS_DONE) self.assertNotIn(JOB_ERROR, status) - self.assertTrue(os.path.exists( - os.path.join(job_output_dir, get_base_file(JOB_TYPE_PYTHON)))) - self.assertTrue(os.path.exists( - os.path.join(job_output_dir, PARAMS_FILE))) - self.assertTrue(os.path.exists( - os.path.join(job_output_dir, get_job_file(JOB_TYPE_PYTHON)))) - self.assertTrue(os.path.exists( - os.path.join(job_output_dir, get_result_file(JOB_TYPE_PYTHON)))) + + print(os.listdir(job_output_dir)) + self.assertEqual(count_non_locks(job_output_dir), 4) + for f in [META_FILE, "recipe.py", "output.log", "job.sh"]: + self.assertTrue(os.path.exists(os.path.join(job_output_dir, f))) self.assertTrue(os.path.exists(result_path)) + result = read_file(result_path) + self.assertEqual(result, "25293.75") # Test LocalPythonConductor executes valid papermill jobs def testLocalPythonConductorValidPapermillJob(self)->None: + from_handler_to_runner_reader, from_handler_to_runner_writer = Pipe() + bh = PapermillHandler(job_queue_dir=TEST_JOB_QUEUE) + bh.to_runner_job = from_handler_to_runner_writer + + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) lpc = LocalPythonConductor( job_queue_dir=TEST_JOB_QUEUE, job_output_dir=TEST_JOB_OUTPUT ) + lpc.to_runner_job = conductor_to_test_conductor file_path = os.path.join(TEST_MONITOR_BASE, "test") result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") @@ -178,38 +188,34 @@ class PythonTests(unittest.TestCase): "outfile":result_path } - job_dict = create_job( - JOB_TYPE_PAPERMILL, - create_watchdog_event( - file_path, - rule, - TEST_MONITOR_BASE, - file_hash - ), - extras={ - JOB_PARAMETERS:params_dict, - PYTHON_FUNC:papermill_job_func - } + event = create_watchdog_event( + file_path, + rule, + TEST_MONITOR_BASE, + file_hash ) - job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) - make_dir(job_dir) + bh.setup_job(event, params_dict) - param_file = os.path.join(job_dir, PARAMS_FILE) - write_yaml(params_dict, param_file) + lpc.start() - meta_path = os.path.join(job_dir, META_FILE) - write_yaml(job_dict, meta_path) + # Get valid job + if from_handler_to_runner_reader.poll(3): + job_queue_dir = from_handler_to_runner_reader.recv() - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL)) - write_notebook(APPENDING_NOTEBOOK, base_file) + # Send it to conductor + if conductor_to_test_test.poll(3): + _ = conductor_to_test_test.recv() + conductor_to_test_test.send(job_queue_dir) - lpc.execute(job_dir) + # Wait for job to complete + if conductor_to_test_test.poll(3): + _ = conductor_to_test_test.recv() + conductor_to_test_test.send(1) - job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) - self.assertFalse(os.path.exists(job_dir)) - - job_output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + job_output_dir = job_queue_dir.replace(TEST_JOB_QUEUE, TEST_JOB_OUTPUT) + + self.assertFalse(os.path.exists(job_queue_dir)) self.assertTrue(os.path.exists(job_output_dir)) meta_path = os.path.join(job_output_dir, META_FILE) @@ -219,139 +225,13 @@ class PythonTests(unittest.TestCase): self.assertIn(JOB_STATUS, status) self.assertEqual(status[JOB_STATUS], STATUS_DONE) - self.assertTrue(os.path.exists( - os.path.join(job_output_dir, get_base_file(JOB_TYPE_PAPERMILL)))) - self.assertTrue(os.path.exists( - os.path.join(job_output_dir, PARAMS_FILE))) - self.assertTrue(os.path.exists( - os.path.join(job_output_dir, get_job_file(JOB_TYPE_PAPERMILL)))) - self.assertTrue(os.path.exists( - os.path.join(job_output_dir, get_result_file(JOB_TYPE_PAPERMILL)))) - - self.assertTrue(os.path.exists(result_path)) - - # Test LocalPythonConductor does not execute jobs with bad arguments - def testLocalPythonConductorBadArgs(self)->None: - lpc = LocalPythonConductor( - job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT - ) - - file_path = os.path.join(TEST_MONITOR_BASE, "test") - result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") - - with open(file_path, "w") as f: - f.write("Data") - - file_hash = get_hash(file_path, SHA256) - - pattern = FileEventPattern( - "pattern", - file_path, - "recipe_one", - "infile", - parameters={ - "extra":"A line from a test Pattern", - "outfile":result_path - }) - recipe = JupyterNotebookRecipe( - "recipe_one", APPENDING_NOTEBOOK) - - rule = create_rule(pattern, recipe) - - params_dict = { - "extra":"extra", - "infile":file_path, - "outfile":result_path - } - - bad_job_dict = create_job( - JOB_TYPE_PAPERMILL, - create_watchdog_event( - file_path, - rule, - TEST_MONITOR_BASE, - file_hash - ), - extras={ - JOB_PARAMETERS:params_dict - } - ) - - bad_job_dir = os.path.join(TEST_JOB_QUEUE, bad_job_dict[JOB_ID]) - make_dir(bad_job_dir) - - bad_param_file = os.path.join(bad_job_dir, PARAMS_FILE) - write_yaml(params_dict, bad_param_file) - - bad_meta_path = os.path.join(bad_job_dir, META_FILE) - write_yaml(bad_job_dict, bad_meta_path) - - bad_base_file = os.path.join(bad_job_dir, - get_base_file(JOB_TYPE_PAPERMILL)) - write_notebook(APPENDING_NOTEBOOK, bad_base_file) - - lpc.execute(bad_job_dir) - - bad_output_dir = os.path.join(TEST_JOB_OUTPUT, bad_job_dict[JOB_ID]) - self.assertFalse(os.path.exists(bad_job_dir)) - self.assertTrue(os.path.exists(bad_output_dir)) - - bad_meta_path = os.path.join(bad_output_dir, META_FILE) - self.assertTrue(os.path.exists(bad_meta_path)) - - bad_job = read_yaml(bad_meta_path) - self.assertIsInstance(bad_job, dict) - self.assertIn(JOB_ERROR, bad_job) - - # Ensure execution can continue after one failed job - good_job_dict = create_job( - JOB_TYPE_PAPERMILL, - create_watchdog_event( - file_path, - rule, - TEST_MONITOR_BASE, - file_hash - ), - extras={ - JOB_PARAMETERS:params_dict, - PYTHON_FUNC:papermill_job_func - } - ) - - good_job_dir = os.path.join(TEST_JOB_QUEUE, good_job_dict[JOB_ID]) - make_dir(good_job_dir) - - good_param_file = os.path.join(good_job_dir, PARAMS_FILE) - write_yaml(params_dict, good_param_file) - - good_meta_path = os.path.join(good_job_dir, META_FILE) - write_yaml(good_job_dict, good_meta_path) - - good_base_file = os.path.join(good_job_dir, - get_base_file(JOB_TYPE_PAPERMILL)) - write_notebook(APPENDING_NOTEBOOK, good_base_file) - - lpc.execute(good_job_dir) - - good_job_dir = os.path.join(TEST_JOB_QUEUE, good_job_dict[JOB_ID]) - self.assertFalse(os.path.exists(good_job_dir)) - - good_job_output_dir = os.path.join(TEST_JOB_OUTPUT, good_job_dict[JOB_ID]) - self.assertTrue(os.path.exists(good_job_output_dir)) - self.assertTrue(os.path.exists( - os.path.join(good_job_output_dir, META_FILE))) - - self.assertTrue(os.path.exists( - os.path.join(good_job_output_dir, get_base_file(JOB_TYPE_PAPERMILL)))) - self.assertTrue(os.path.exists( - os.path.join(good_job_output_dir, PARAMS_FILE))) - self.assertTrue(os.path.exists( - os.path.join(good_job_output_dir, get_job_file(JOB_TYPE_PAPERMILL)))) - self.assertTrue(os.path.exists( - os.path.join(good_job_output_dir, get_result_file(JOB_TYPE_PAPERMILL)))) + self.assertEqual(count_non_locks(job_output_dir), 4) + for f in [META_FILE, JOB_FILE, "result.ipynb", "recipe.ipynb"]: + self.assertTrue(os.path.exists(os.path.join(job_output_dir, f))) self.assertTrue(os.path.exists(result_path)) + result = read_file(result_path) + self.assertEqual(result, "Data\nextra") # Test LocalPythonConductor does not execute jobs with missing metafile def testLocalPythonConductorMissingMetafile(self)->None: @@ -382,7 +262,7 @@ class PythonTests(unittest.TestCase): rule = create_rule(pattern, recipe) - job_dict = create_job( + job_dict = create_job_metadata_dict( JOB_TYPE_PAPERMILL, create_watchdog_event( file_path, @@ -418,77 +298,6 @@ class PythonTests(unittest.TestCase): "Recieved incorrectly setup job.\n\n[Errno 2] No such file or " f"directory: 'test_job_queue_dir{os.path.sep}{job_dict[JOB_ID]}{os.path.sep}job.yml'") - # Test LocalPythonConductor does not execute jobs with bad functions - def testLocalPythonConductorBadFunc(self)->None: - lpc = LocalPythonConductor( - job_queue_dir=TEST_JOB_QUEUE, - job_output_dir=TEST_JOB_OUTPUT - ) - - file_path = os.path.join(TEST_MONITOR_BASE, "test") - result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") - - with open(file_path, "w") as f: - f.write("Data") - - file_hash = get_hash(file_path, SHA256) - - pattern = FileEventPattern( - "pattern", - file_path, - "recipe_one", - "infile", - parameters={ - "extra":"A line from a test Pattern", - "outfile":result_path - }) - recipe = JupyterNotebookRecipe( - "recipe_one", APPENDING_NOTEBOOK) - - rule = create_rule(pattern, recipe) - - params = { - "extra":"extra", - "infile":file_path, - "outfile":result_path - } - - job_dict = create_job( - JOB_TYPE_PAPERMILL, - create_watchdog_event( - file_path, - rule, - TEST_MONITOR_BASE, - file_hash - ), - extras={ - JOB_PARAMETERS:params, - PYTHON_FUNC:failing_func, - } - ) - - job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) - make_dir(job_dir) - - param_file = os.path.join(job_dir, PARAMS_FILE) - write_yaml(params, param_file) - - meta_path = os.path.join(job_dir, META_FILE) - write_yaml(job_dict, meta_path) - - lpc.execute(job_dir) - - output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) - self.assertFalse(os.path.exists(job_dir)) - self.assertTrue(os.path.exists(output_dir)) - - meta_path = os.path.join(output_dir, META_FILE) - self.assertTrue(os.path.exists(meta_path)) - - job = read_yaml(meta_path) - self.assertIsInstance(job, dict) - self.assertIn(JOB_ERROR, job) - # Test LocalPythonConductor does not execute jobs with invalid metafile def testLocalPythonConductorInvalidMetafile(self)->None: lpc = LocalPythonConductor( @@ -518,7 +327,7 @@ class PythonTests(unittest.TestCase): rule = create_rule(pattern, recipe) - job_dict = create_job( + job_dict = create_job_metadata_dict( JOB_TYPE_PAPERMILL, create_watchdog_event( file_path, @@ -586,7 +395,7 @@ class PythonTests(unittest.TestCase): rule = create_rule(pattern, recipe) - job_dict = create_job( + job_dict = create_job_metadata_dict( JOB_TYPE_PAPERMILL, create_watchdog_event( file_path, @@ -746,10 +555,16 @@ class BashTests(unittest.TestCase): # Test LocalBashConductor executes valid bash jobs def testLocalBashConductorValidBashJob(self)->None: + from_handler_to_runner_reader, from_handler_to_runner_writer = Pipe() + bh = BashHandler(job_queue_dir=TEST_JOB_QUEUE) + bh.to_runner_job = from_handler_to_runner_writer + + conductor_to_test_conductor, conductor_to_test_test = Pipe(duplex=True) lpc = LocalBashConductor( job_queue_dir=TEST_JOB_QUEUE, job_output_dir=TEST_JOB_OUTPUT ) + lpc.to_runner_job = conductor_to_test_conductor file_path = os.path.join(TEST_MONITOR_BASE, "test") result_path = os.path.join(TEST_MONITOR_BASE, "output") @@ -779,44 +594,34 @@ class BashTests(unittest.TestCase): "outfile":result_path } - job_dict = create_job( - JOB_TYPE_BASH, - create_watchdog_event( - file_path, - rule, - TEST_MONITOR_BASE, - file_hash - ), - extras={ - JOB_PARAMETERS:params_dict, - } + event = create_watchdog_event( + file_path, + rule, + TEST_MONITOR_BASE, + file_hash ) - job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) - make_dir(job_dir) + bh.setup_job(event, params_dict) - meta_path = os.path.join(job_dir, META_FILE) - write_yaml(job_dict, meta_path) + lpc.start() - base_script = parameterize_bash_script( - COMPLETE_BASH_SCRIPT, params_dict - ) - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_BASH)) - write_file(lines_to_string(base_script), base_file) - st = os.stat(base_file) - os.chmod(base_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + # Get valid job + if from_handler_to_runner_reader.poll(3): + job_queue_dir = from_handler_to_runner_reader.recv() - job_script = assemble_bash_job_script() - job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)) - write_file(lines_to_string(job_script), job_file) - st = os.stat(job_file) - os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + # Send it to conductor + if conductor_to_test_test.poll(3): + _ = conductor_to_test_test.recv() + conductor_to_test_test.send(job_queue_dir) - lpc.execute(job_dir) + # Wait for job to complete + if conductor_to_test_test.poll(3): + _ = conductor_to_test_test.recv() + conductor_to_test_test.send(1) - self.assertFalse(os.path.exists(job_dir)) - - job_output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) + job_output_dir = job_queue_dir.replace(TEST_JOB_QUEUE, TEST_JOB_OUTPUT) + + self.assertFalse(os.path.exists(job_queue_dir)) self.assertTrue(os.path.exists(job_output_dir)) meta_path = os.path.join(job_output_dir, META_FILE) @@ -824,15 +629,24 @@ class BashTests(unittest.TestCase): status = read_yaml(meta_path) self.assertIsInstance(status, Dict) self.assertIn(JOB_STATUS, status) - self.assertEqual(status[JOB_STATUS], STATUS_DONE) + self.assertEqual(status[JOB_STATUS], STATUS_DONE) self.assertNotIn(JOB_ERROR, status) + + self.assertEqual(count_non_locks(job_output_dir), 3) + for f in [META_FILE, JOB_FILE]: + self.assertTrue(os.path.exists(os.path.join(job_output_dir, f))) + job = threadsafe_read_status(os.path.join(job_output_dir, META_FILE)) + self.assertTrue(os.path.exists(os.path.join(job_output_dir, job["tmp script command"]))) + self.assertTrue(os.path.exists( - os.path.join(job_output_dir, get_base_file(JOB_TYPE_BASH)))) + os.path.join(job_output_dir, ))) self.assertTrue(os.path.exists( - os.path.join(job_output_dir, get_job_file(JOB_TYPE_BASH)))) + os.path.join(job_output_dir, JOB_FILE))) self.assertTrue(os.path.exists(result_path)) + result = read_file(result_path) + self.assertEqual(result, "25293\n") # Test LocalBashConductor does not execute jobs with missing metafile def testLocalBashConductorMissingMetafile(self)->None: @@ -869,7 +683,7 @@ class BashTests(unittest.TestCase): "outfile":result_path } - job_dict = create_job( + job_dict = create_job_metadata_dict( JOB_TYPE_BASH, create_watchdog_event( file_path, @@ -935,7 +749,7 @@ class BashTests(unittest.TestCase): "outfile":result_path } - job_dict = create_job( + job_dict = create_job_metadata_dict( JOB_TYPE_PAPERMILL, create_watchdog_event( file_path, @@ -958,7 +772,7 @@ class BashTests(unittest.TestCase): base_script = parameterize_bash_script( COMPLETE_BASH_SCRIPT, params_dict ) - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_BASH)) + base_file = os.path.join(job_dir, JOB_FILE) write_file(lines_to_string(base_script), base_file) st = os.stat(base_file) os.chmod(base_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) @@ -967,7 +781,7 @@ class BashTests(unittest.TestCase): "#!/bin/bash", "echo Does Nothing" ] - job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)) + job_file = os.path.join(job_dir, JOB_FILE) write_file(lines_to_string(job_script), job_file) st = os.stat(job_file) os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) @@ -1019,7 +833,7 @@ class BashTests(unittest.TestCase): "outfile":result_path } - job_dict = create_job( + job_dict = create_job_metadata_dict( JOB_TYPE_PAPERMILL, create_watchdog_event( file_path, @@ -1088,7 +902,7 @@ class BashTests(unittest.TestCase): "outfile":result_path } - job_dict = create_job( + job_dict = create_job_metadata_dict( JOB_TYPE_PAPERMILL, create_watchdog_event( file_path, diff --git a/tests/test_functionality.py b/tests/test_functionality.py index aeac077..2f9450d 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -17,7 +17,7 @@ from meow_base.core.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ WATCHDOG_BASE, WATCHDOG_HASH, EVENT_RULE, JOB_PARAMETERS, \ PYTHON_FUNC, JOB_ID, JOB_EVENT, JOB_ERROR, STATUS_DONE, \ JOB_TYPE, JOB_PATTERN, JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, \ - JOB_REQUIREMENTS, STATUS_QUEUED, JOB_TYPE_PAPERMILL + JOB_REQUIREMENTS, STATUS_QUEUED, JOB_TYPE_PAPERMILL, STATUS_CREATING from meow_base.functionality.debug import setup_debugging from meow_base.functionality.file_io import lines_to_string, make_dir, \ read_file, read_file_lines, read_notebook, read_yaml, rmtree, write_file, \ @@ -27,7 +27,7 @@ from meow_base.functionality.hashing import get_hash from meow_base.functionality.meow import KEYWORD_BASE, KEYWORD_DIR, \ KEYWORD_EXTENSION, KEYWORD_FILENAME, KEYWORD_JOB, KEYWORD_PATH, \ KEYWORD_PREFIX, KEYWORD_REL_DIR, KEYWORD_REL_PATH, \ - create_event, create_job, create_rule, create_rules, \ + create_event, create_job_metadata_dict, create_rule, create_rules, \ create_watchdog_event, replace_keywords, create_parameter_sweep from meow_base.functionality.naming import _generate_id from meow_base.functionality.parameterisation import \ @@ -645,7 +645,7 @@ class MeowTests(unittest.TestCase): self.assertEqual(event2[EVENT_RULE], rule) self.assertEqual(event2["a"], 1) - # Test that create_job produces valid job dictionary + # Test that create_job_metadata_dict produces valid job dictionary def testCreateJob(self)->None: pattern = FileEventPattern( "pattern", @@ -672,7 +672,7 @@ class MeowTests(unittest.TestCase): } ) - job_dict = create_job( + job_dict = create_job_metadata_dict( JOB_TYPE_PAPERMILL, event, extras={ @@ -699,7 +699,7 @@ class MeowTests(unittest.TestCase): self.assertIn(JOB_RULE, job_dict) self.assertEqual(job_dict[JOB_RULE], rule.name) self.assertIn(JOB_STATUS, job_dict) - self.assertEqual(job_dict[JOB_STATUS], STATUS_QUEUED) + self.assertEqual(job_dict[JOB_STATUS], STATUS_CREATING) self.assertIn(JOB_CREATE_TIME, job_dict) self.assertIsInstance(job_dict[JOB_CREATE_TIME], datetime) self.assertIn(JOB_REQUIREMENTS, job_dict) diff --git a/tests/test_recipes.py b/tests/test_recipes.py index 141d924..680942c 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -1,38 +1,28 @@ import jsonschema import os -import stat -import subprocess import unittest from multiprocessing import Pipe -from typing import Dict from meow_base.core.meow import valid_job from meow_base.core.vars import EVENT_TYPE, WATCHDOG_BASE, \ EVENT_RULE, EVENT_TYPE_WATCHDOG, EVENT_PATH, SHA256, WATCHDOG_HASH, \ - JOB_ID, JOB_TYPE_PYTHON, JOB_PARAMETERS, PYTHON_FUNC, \ - JOB_STATUS, META_FILE, JOB_ERROR, PARAMS_FILE, SWEEP_STOP, SWEEP_JUMP, \ - SWEEP_START, JOB_TYPE_PAPERMILL, JOB_TYPE_BASH, \ - get_base_file, get_job_file, get_result_file + JOB_PARAMETERS, JOB_FILE, META_FILE, SWEEP_STOP, SWEEP_JUMP, \ + SWEEP_START from meow_base.core.rule import Rule -from meow_base.functionality.file_io import lines_to_string, make_dir, \ - read_yaml, write_file, write_notebook, write_yaml +from meow_base.functionality.file_io import read_yaml, write_notebook, \ + threadsafe_read_status from meow_base.functionality.hashing import get_hash -from meow_base.functionality.meow import create_job, create_rules, \ - create_rule, create_watchdog_event -from meow_base.functionality.parameterisation import parameterize_bash_script +from meow_base.functionality.meow import create_rules, create_rule from meow_base.patterns.file_event_pattern import FileEventPattern -from meow_base.recipes.bash_recipe import BashRecipe, BashHandler, \ - assemble_bash_job_script +from meow_base.recipes.bash_recipe import BashRecipe, BashHandler from meow_base.recipes.jupyter_notebook_recipe import JupyterNotebookRecipe, \ - PapermillHandler, papermill_job_func, get_recipe_from_notebook -from meow_base.recipes.python_recipe import PythonRecipe, PythonHandler, \ - python_job_func + PapermillHandler, get_recipe_from_notebook +from meow_base.recipes.python_recipe import PythonRecipe, PythonHandler from shared import BAREBONES_PYTHON_SCRIPT, COMPLETE_PYTHON_SCRIPT, \ TEST_JOB_QUEUE, TEST_MONITOR_BASE, TEST_JOB_OUTPUT, BAREBONES_NOTEBOOK, \ - APPENDING_NOTEBOOK, COMPLETE_NOTEBOOK, BAREBONES_BASH_SCRIPT, \ - COMPLETE_BASH_SCRIPT, \ + COMPLETE_NOTEBOOK, BAREBONES_BASH_SCRIPT, COMPLETE_BASH_SCRIPT, \ setup, teardown class JupyterNotebookTests(unittest.TestCase): @@ -338,94 +328,6 @@ class PapermillHandlerTests(unittest.TestCase): values.remove(val) self.assertEqual(len(values), 0) - # Test jobFunc performs as expected - def testJobFunc(self)->None: - file_path = os.path.join(TEST_MONITOR_BASE, "test") - result_path = os.path.join(TEST_MONITOR_BASE, "output", "test") - - with open(file_path, "w") as f: - f.write("Data") - - file_hash = get_hash(file_path, SHA256) - - pattern = FileEventPattern( - "pattern", - file_path, - "recipe_one", - "infile", - parameters={ - "extra":"A line from a test Pattern", - "outfile":result_path - }) - recipe = JupyterNotebookRecipe( - "recipe_one", APPENDING_NOTEBOOK) - - rule = create_rule(pattern, recipe) - - params_dict = { - "extra":"extra", - "infile":file_path, - "outfile":result_path - } - - job_dict = create_job( - JOB_TYPE_PAPERMILL, - create_watchdog_event( - file_path, - rule, - TEST_MONITOR_BASE, - file_hash - ), - extras={ - JOB_PARAMETERS:params_dict, - PYTHON_FUNC:papermill_job_func - } - ) - - job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) - make_dir(job_dir) - - meta_file = os.path.join(job_dir, META_FILE) - write_yaml(job_dict, meta_file) - - param_file = os.path.join(job_dir, PARAMS_FILE) - write_yaml(params_dict, param_file) - - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL)) - write_notebook(APPENDING_NOTEBOOK, base_file) - - papermill_job_func(job_dir) - - job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) - self.assertTrue(os.path.exists(job_dir)) - - meta_path = os.path.join(job_dir, META_FILE) - self.assertTrue(os.path.exists(meta_path)) - status = read_yaml(meta_path) - self.assertIsInstance(status, Dict) - self.assertIn(JOB_STATUS, status) - self.assertEqual(status[JOB_STATUS], job_dict[JOB_STATUS]) - - self.assertTrue(os.path.exists( - os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL)))) - self.assertTrue(os.path.exists(os.path.join(job_dir, PARAMS_FILE))) - self.assertTrue(os.path.exists( - os.path.join(job_dir, get_job_file(JOB_TYPE_PAPERMILL)))) - self.assertTrue(os.path.exists( - os.path.join(job_dir, get_result_file(JOB_TYPE_PAPERMILL)))) - - self.assertTrue(os.path.exists(result_path)) - - # Test jobFunc doesn't execute with no args - def testJobFuncBadArgs(self)->None: - try: - papermill_job_func({}) - except Exception: - pass - - self.assertEqual(len(os.listdir(TEST_JOB_QUEUE)), 0) - self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) - # Test handling criteria function def testValidHandleCriteria(self)->None: ph = PapermillHandler() @@ -840,100 +742,6 @@ class PythonHandlerTests(unittest.TestCase): values.remove(val) self.assertEqual(len(values), 0) - # Test jobFunc performs as expected - def testJobFunc(self)->None: - file_path = os.path.join(TEST_MONITOR_BASE, "test") - result_path = os.path.join(TEST_MONITOR_BASE, "output") - - with open(file_path, "w") as f: - f.write("250") - - file_hash = get_hash(file_path, SHA256) - - pattern = FileEventPattern( - "pattern", - file_path, - "recipe_one", - "infile", - parameters={ - "extra":"A line from a test Pattern", - "outfile": result_path - }) - recipe = PythonRecipe( - "recipe_one", COMPLETE_PYTHON_SCRIPT) - - rule = create_rule(pattern, recipe) - - params_dict = { - "extra":"extra", - "infile":file_path, - "outfile": result_path - } - - job_dict = create_job( - JOB_TYPE_PYTHON, - create_watchdog_event( - file_path, - rule, - TEST_MONITOR_BASE, - file_hash - ), - extras={ - JOB_PARAMETERS:params_dict, - PYTHON_FUNC:python_job_func - } - ) - - job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) - make_dir(job_dir) - - meta_file = os.path.join(job_dir, META_FILE) - write_yaml(job_dict, meta_file) - - param_file = os.path.join(job_dir, PARAMS_FILE) - write_yaml(params_dict, param_file) - - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PYTHON)) - write_notebook(APPENDING_NOTEBOOK, base_file) - write_file(lines_to_string(COMPLETE_PYTHON_SCRIPT), base_file) - - python_job_func(job_dir) - - self.assertTrue(os.path.exists(job_dir)) - meta_path = os.path.join(job_dir, META_FILE) - self.assertTrue(os.path.exists(meta_path)) - - status = read_yaml(meta_path) - self.assertIsInstance(status, Dict) - self.assertIn(JOB_STATUS, status) - self.assertEqual(status[JOB_STATUS], job_dict[JOB_STATUS]) - self.assertNotIn(JOB_ERROR, status) - - self.assertTrue(os.path.exists( - os.path.join(job_dir, get_base_file(JOB_TYPE_PYTHON)))) - self.assertTrue(os.path.exists(os.path.join(job_dir, PARAMS_FILE))) - self.assertTrue(os.path.exists( - os.path.join(job_dir, get_job_file(JOB_TYPE_PYTHON)))) - self.assertTrue(os.path.exists( - os.path.join(job_dir, get_result_file(JOB_TYPE_PYTHON)))) - - self.assertTrue(os.path.exists(result_path)) - - with open(result_path, "r") as f: - result = f.read() - - self.assertEqual(result, "124937.5") - - # Test jobFunc doesn't execute with no args - def testJobFuncBadArgs(self)->None: - try: - python_job_func({}) - except Exception: - pass - - self.assertEqual(len(os.listdir(TEST_JOB_QUEUE)), 0) - self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) - # Test handling criteria function def testValidHandleCriteria(self)->None: ph = PythonHandler() @@ -1336,97 +1144,61 @@ class BashHandlerTests(unittest.TestCase): values.remove(val) self.assertEqual(len(values), 0) - # Test jobFunc performs as expected - def testJobFunc(self)->None: - file_path = os.path.join(TEST_MONITOR_BASE, "test") - result_path = os.path.join(TEST_MONITOR_BASE, "output") + def testJobSetup(self)->None: + from_handler_to_runner_reader, from_handler_to_runner_writer = Pipe() + bh = BashHandler(job_queue_dir=TEST_JOB_QUEUE) + bh.to_runner_job = from_handler_to_runner_writer + + with open(os.path.join(TEST_MONITOR_BASE, "A"), "w") as f: + f.write("Data") - with open(file_path, "w") as f: - f.write("250") - - file_hash = get_hash(file_path, SHA256) - - pattern = FileEventPattern( - "pattern", - file_path, - "recipe_one", - "infile", - parameters={ - "extra":"A line from a test Pattern", - "outfile": result_path - }) + pattern_one = FileEventPattern( + "pattern_one", "A", "recipe_one", "file_one") recipe = BashRecipe( "recipe_one", COMPLETE_BASH_SCRIPT) - rule = create_rule(pattern, recipe) - - params_dict = { - "extra":"extra", - "infile":file_path, - "outfile": result_path + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, } - job_dict = create_job( - JOB_TYPE_BASH, - create_watchdog_event( - file_path, - rule, - TEST_MONITOR_BASE, - file_hash - ), - extras={ - JOB_PARAMETERS:params_dict - } - ) + rules = create_rules(patterns, recipes) + self.assertEqual(len(rules), 1) + _, rule = rules.popitem() + self.assertIsInstance(rule, Rule) - job_dir = os.path.join(TEST_JOB_QUEUE, job_dict[JOB_ID]) - make_dir(job_dir) + self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 0) - meta_file = os.path.join(job_dir, META_FILE) - write_yaml(job_dict, meta_file) + event = { + EVENT_TYPE: EVENT_TYPE_WATCHDOG, + EVENT_PATH: os.path.join(TEST_MONITOR_BASE, "A"), + WATCHDOG_BASE: TEST_MONITOR_BASE, + EVENT_RULE: rule, + WATCHDOG_HASH: get_hash( + os.path.join(TEST_MONITOR_BASE, "A"), SHA256 + ) + } - base_script = parameterize_bash_script( - COMPLETE_BASH_SCRIPT, params_dict - ) - base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_BASH)) - write_file(lines_to_string(base_script), base_file) - st = os.stat(base_file) - os.chmod(base_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + params_dict = { + "file_one": os.path.join(TEST_MONITOR_BASE, "A") + } - job_script = assemble_bash_job_script() - job_file = os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)) - write_file(lines_to_string(job_script), job_file) - st = os.stat(job_file) - os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + bh.setup_job(event, params_dict) - print(os.listdir(job_dir)) - print(os.getcwd()) - - result = subprocess.call(job_file, cwd=".") - - self.assertEqual(result, 0) + if from_handler_to_runner_reader.poll(3): + job_dir = from_handler_to_runner_reader.recv() + self.assertIsInstance(job_dir, str) self.assertTrue(os.path.exists(job_dir)) - meta_path = os.path.join(job_dir, META_FILE) - self.assertTrue(os.path.exists(meta_path)) - status = read_yaml(meta_path) - self.assertIsInstance(status, Dict) - self.assertIn(JOB_STATUS, status) - self.assertEqual(status[JOB_STATUS], job_dict[JOB_STATUS]) - self.assertNotIn(JOB_ERROR, status) + self.assertTrue(len(os.listdir(job_dir)), 3) + for f in [META_FILE, "recipe.sh", JOB_FILE]: + self.assertTrue(os.path.exists(os.path.join(job_dir, f))) - self.assertTrue(os.path.exists( - os.path.join(job_dir, get_base_file(JOB_TYPE_BASH)))) - self.assertTrue(os.path.exists( - os.path.join(job_dir, get_job_file(JOB_TYPE_BASH)))) - - self.assertTrue(os.path.exists(result_path)) - - with open(result_path, "r") as f: - result = f.read() - - self.assertEqual(result, "124937\n") + job = threadsafe_read_status(os.path.join(job_dir, META_FILE)) + valid_job(job) # Test handling criteria function def testValidHandleCriteria(self)->None: diff --git a/tests/test_runner.py b/tests/test_runner.py index 3ca3a25..a3461c0 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -1,5 +1,4 @@ -import io import importlib import os import unittest @@ -7,7 +6,6 @@ import unittest from multiprocessing import Pipe from random import shuffle from shutil import copy -from time import sleep from warnings import warn from meow_base.core.base_conductor import BaseConductor @@ -15,7 +13,7 @@ from meow_base.core.base_handler import BaseHandler from meow_base.core.base_monitor import BaseMonitor from meow_base.conductors import LocalPythonConductor from meow_base.core.vars import JOB_TYPE_PAPERMILL, JOB_ERROR, \ - META_FILE, JOB_TYPE_PYTHON, JOB_CREATE_TIME, get_result_file + META_FILE, JOB_TYPE_PYTHON, JOB_CREATE_TIME from meow_base.core.runner import MeowRunner from meow_base.functionality.file_io import make_dir, read_file, \ read_notebook, read_yaml, write_file, lines_to_string @@ -372,7 +370,7 @@ class MeowTests(unittest.TestCase): f.write("Initial Data") loops = 0 - while loops < 5: + while loops < 10: # Initial prompt if conductor_to_test_test.poll(5): msg = conductor_to_test_test.recv() @@ -396,7 +394,7 @@ class MeowTests(unittest.TestCase): else: raise Exception("Timed out") self.assertEqual(msg, 1) - loops = 5 + loops = 10 loops += 1 @@ -409,10 +407,10 @@ class MeowTests(unittest.TestCase): runner.stop() print(os.listdir(job_dir)) - self.assertEqual(count_non_locks(job_dir), 5) + self.assertEqual(count_non_locks(job_dir), 4) result = read_notebook( - os.path.join(job_dir, get_result_file(JOB_TYPE_PAPERMILL))) + os.path.join(job_dir, "result.ipynb")) self.assertIsNotNone(result) output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") @@ -522,10 +520,10 @@ class MeowTests(unittest.TestCase): self.assertIn(job_ids[1], os.listdir(TEST_JOB_OUTPUT)) mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_ids[0]) - self.assertEqual(count_non_locks(mid_job_dir), 5) + self.assertEqual(count_non_locks(mid_job_dir), 4) result = read_notebook( - os.path.join(mid_job_dir, get_result_file(JOB_TYPE_PAPERMILL))) + os.path.join(mid_job_dir, "result.ipynb")) self.assertIsNotNone(result) mid_output_path = os.path.join(TEST_MONITOR_BASE, "middle", "A.txt") @@ -537,10 +535,10 @@ class MeowTests(unittest.TestCase): self.assertEqual(data, "Initial Data\nA line from Pattern 1") final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_ids[1]) - self.assertEqual(count_non_locks(final_job_dir), 5) + self.assertEqual(count_non_locks(final_job_dir), 4) result = read_notebook(os.path.join(final_job_dir, - get_result_file(JOB_TYPE_PAPERMILL))) + "result.ipynb")) self.assertIsNotNone(result) final_output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") @@ -651,11 +649,11 @@ class MeowTests(unittest.TestCase): self.assertNotIn(JOB_ERROR, status) - result_path = os.path.join(job_dir, get_result_file(JOB_TYPE_PYTHON)) + result_path = os.path.join(job_dir, "output.log") self.assertTrue(os.path.exists(result_path)) result = read_file(os.path.join(result_path)) self.assertEqual( - result, "--STDOUT--\n12505000.0\ndone\n\n\n--STDERR--\n\n") + result, "12505000.0\ndone\n") output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") self.assertTrue(os.path.exists(output_path)) @@ -779,18 +777,18 @@ class MeowTests(unittest.TestCase): final_job_id = job_ids[0] mid_job_dir = os.path.join(TEST_JOB_OUTPUT, mid_job_id) - self.assertEqual(count_non_locks(mid_job_dir), 5) + self.assertEqual(count_non_locks(mid_job_dir), 4) mid_metafile = os.path.join(mid_job_dir, META_FILE) mid_status = read_yaml(mid_metafile) self.assertNotIn(JOB_ERROR, mid_status) mid_result_path = os.path.join( - mid_job_dir, get_result_file(JOB_TYPE_PYTHON)) + mid_job_dir, "output.log") self.assertTrue(os.path.exists(mid_result_path)) mid_result = read_file(os.path.join(mid_result_path)) self.assertEqual( - mid_result, "--STDOUT--\n7806.25\ndone\n\n\n--STDERR--\n\n") + mid_result, "7806.25\ndone\n") mid_output_path = os.path.join(TEST_MONITOR_BASE, "middle", "A.txt") self.assertTrue(os.path.exists(mid_output_path)) @@ -798,17 +796,17 @@ class MeowTests(unittest.TestCase): self.assertEqual(mid_output, "7806.25") final_job_dir = os.path.join(TEST_JOB_OUTPUT, final_job_id) - self.assertEqual(count_non_locks(final_job_dir), 5) + self.assertEqual(count_non_locks(final_job_dir), 4) final_metafile = os.path.join(final_job_dir, META_FILE) final_status = read_yaml(final_metafile) self.assertNotIn(JOB_ERROR, final_status) - final_result_path = os.path.join(final_job_dir, get_result_file(JOB_TYPE_PYTHON)) + final_result_path = os.path.join(final_job_dir, "output.log") self.assertTrue(os.path.exists(final_result_path)) final_result = read_file(os.path.join(final_result_path)) self.assertEqual( - final_result, "--STDOUT--\n2146.5625\ndone\n\n\n--STDERR--\n\n") + final_result, "2146.5625\ndone\n") final_output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") self.assertTrue(os.path.exists(final_output_path)) @@ -916,7 +914,7 @@ class MeowTests(unittest.TestCase): self.assertNotIn(JOB_ERROR, status) - result_path = os.path.join(job_dir, get_result_file(JOB_TYPE_PYTHON)) + result_path = os.path.join(job_dir, "output.log") self.assertTrue(os.path.exists(result_path)) output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") @@ -1091,11 +1089,11 @@ class MeowTests(unittest.TestCase): self.assertNotIn(JOB_ERROR, status) - result_path = os.path.join(job_dir, get_result_file(JOB_TYPE_PYTHON)) + result_path = os.path.join(job_dir, "output.log") self.assertTrue(os.path.exists(result_path)) result = read_file(os.path.join(result_path)) self.assertEqual( - result, "--STDOUT--\n12505000.0\ndone\n\n\n--STDERR--\n\n") + result, "12505000.0\ndone\n") output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") self.assertTrue(os.path.exists(output_path)) @@ -1281,7 +1279,7 @@ class MeowTests(unittest.TestCase): self.assertNotIn(JOB_ERROR, status) result_path = os.path.join( - TEST_JOB_OUTPUT, job_dir, get_result_file(JOB_TYPE_PAPERMILL)) + TEST_JOB_OUTPUT, job_dir, "result.ipynb") self.assertTrue(os.path.exists(result_path)) # Test some actual scientific analysis, in a predicatable loop @@ -1426,7 +1424,7 @@ class MeowTests(unittest.TestCase): self.assertNotIn(JOB_ERROR, status) result_path = os.path.join( - TEST_JOB_OUTPUT, job_dir, get_result_file(JOB_TYPE_PAPERMILL)) + TEST_JOB_OUTPUT, job_dir, "result.ipynb") self.assertTrue(os.path.exists(result_path)) results = len(os.listdir( @@ -1591,7 +1589,7 @@ class MeowTests(unittest.TestCase): self.assertNotIn(JOB_ERROR, status) result_path = os.path.join( - TEST_JOB_OUTPUT, job_dir, get_result_file(JOB_TYPE_PAPERMILL)) + TEST_JOB_OUTPUT, job_dir, "result.ipynb") self.assertTrue(os.path.exists(result_path)) outputs = len(os.listdir(TEST_JOB_OUTPUT)) @@ -1761,7 +1759,7 @@ class MeowTests(unittest.TestCase): self.assertNotIn(JOB_ERROR, status) result_path = os.path.join( - TEST_JOB_OUTPUT, job_dir, get_result_file(JOB_TYPE_PAPERMILL)) + TEST_JOB_OUTPUT, job_dir, "result.ipynb") self.assertTrue(os.path.exists(result_path)) outputs = len(os.listdir(TEST_JOB_OUTPUT))