From 5acb8c230e8b608bf4b6a69895cf28c5190c5a7a Mon Sep 17 00:00:00 2001 From: PatchOfScotland Date: Wed, 1 Feb 2023 15:04:06 +0100 Subject: [PATCH] tidied up job creation a bit more --- core/functionality.py | 6 +++--- core/runner.py | 23 ++++++++++++++++------- recipes/jupyter_notebook_recipe.py | 20 +++++++++++++------- tests/test_conductors.py | 4 ++++ tests/test_functionality.py | 4 ++-- tests/test_recipes.py | 24 +++++++++++++++++------- tests/test_runner.py | 2 ++ 7 files changed, 57 insertions(+), 26 deletions(-) diff --git a/core/functionality.py b/core/functionality.py index f3627f4..19963d4 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -290,12 +290,12 @@ def create_event(event_type:str, path:str, source:dict[Any,Any]={} def create_job(job_type:str, event:dict[str,Any], source:dict[Any,Any]={} )->dict[Any,Any]: job_dict = { - #TODO compress pattern, recipe, rule? + #TODO compress event? JOB_ID: generate_id(prefix="job_"), JOB_EVENT: event, JOB_TYPE: job_type, - JOB_PATTERN: event[WATCHDOG_RULE].pattern, - JOB_RECIPE: event[WATCHDOG_RULE].recipe, + JOB_PATTERN: event[WATCHDOG_RULE].pattern.name, + JOB_RECIPE: event[WATCHDOG_RULE].recipe.name, JOB_RULE: event[WATCHDOG_RULE].name, JOB_STATUS: STATUS_QUEUED, JOB_CREATE_TIME: datetime.now(), diff --git a/core/runner.py b/core/runner.py index 8579ccb..c464843 100644 --- a/core/runner.py +++ b/core/runner.py @@ -6,6 +6,7 @@ with monitors, handlers, and conductors being swappable at initialisation. Author(s): David Marchant """ +import os import sys import threading @@ -15,10 +16,10 @@ from random import randrange from typing import Any, Union from core.correctness.vars import DEBUG_WARNING, DEBUG_INFO, EVENT_TYPE, \ - VALID_CHANNELS, JOB_TYPE, JOB_ID + VALID_CHANNELS, JOB_TYPE, JOB_ID, META_FILE from core.correctness.validation import setup_debugging, check_type, \ valid_list -from core.functionality import print_debug, wait +from core.functionality import print_debug, wait, read_yaml from core.meow import BaseHandler, BaseMonitor, BaseConductor @@ -134,7 +135,7 @@ class MeowRunner: "Could not process event as no relevent " f"handler for '{event[EVENT_TYPE]}'", DEBUG_INFO) - return + continue # If we've only one handler, use that if len(self.handlers[event[EVENT_TYPE]]) == 1: handler = self.handlers[event[EVENT_TYPE]][0] @@ -160,15 +161,23 @@ class MeowRunner: else: for from_handler in self.from_handlers: if from_handler in ready: - # Read event from the handler channel - message = from_handler.recv() - job = message + # Read job directory from the handler channel + job_dir = from_handler.recv() + try: + metafile = os.path.join(job_dir, META_FILE) + job = read_yaml(metafile) + except Exception as e: + print_debug(self._print_target, self.debug_level, + "Could not load necessary job definitions for " + f"job at '{job_dir}'. {e}", DEBUG_INFO) + continue + # Abort if we don't have a relevent conductor. if not self.conductors[job[JOB_TYPE]]: print_debug(self._print_target, self.debug_level, "Could not process job as no relevent " f"conductor for '{job[JOB_TYPE]}'", DEBUG_INFO) - return + continue # If we've only one conductor, use that if len(self.conductors[job[JOB_TYPE]]) == 1: conductor = self.conductors[job[JOB_TYPE]][0] diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index 7821061..4cf8c5e 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -140,12 +140,17 @@ class PapermillHandler(BaseHandler): def setup_job(self, event:dict[str,Any], yaml_dict:dict[str,Any])->None: """Function to set up new job dict and send it to the runner to be executed.""" - meow_job = create_job(PYTHON_TYPE, event, { - JOB_PARAMETERS:yaml_dict, - JOB_HASH: event[WATCHDOG_HASH], - PYTHON_FUNC:job_func, - PYTHON_OUTPUT_DIR:self.output_dir, - PYTHON_EXECUTION_BASE:self.handler_base,}) + meow_job = create_job( + PYTHON_TYPE, + event, + { + JOB_PARAMETERS:yaml_dict, + JOB_HASH: event[WATCHDOG_HASH], + PYTHON_FUNC:job_func, + PYTHON_OUTPUT_DIR:self.output_dir, + PYTHON_EXECUTION_BASE:self.handler_base + } + ) print_debug(self._print_target, self.debug_level, f"Creating job from event at {event[EVENT_PATH]} of type " f"{PYTHON_TYPE}.", DEBUG_INFO) @@ -180,7 +185,8 @@ class PapermillHandler(BaseHandler): # update the status file with queued status write_yaml(meow_job, meta_file) - self.to_runner.send(meow_job) + # Send job directory, as actual definitons will be read from within it + self.to_runner.send(job_dir) # Papermill job execution code, to be run within the conductor def job_func(job): diff --git a/tests/test_conductors.py b/tests/test_conductors.py index c0f1132..55c5989 100644 --- a/tests/test_conductors.py +++ b/tests/test_conductors.py @@ -270,3 +270,7 @@ class MeowTests(unittest.TestCase): with self.assertRaises(Exception): lpc.execute(job_dict) + + # TODO test job status funcs + # TODO test mangled status file reads + # TODO test missing input files diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 57c50ed..0e133d6 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -308,9 +308,9 @@ class CorrectnessTests(unittest.TestCase): self.assertIn(JOB_TYPE, job_dict) self.assertEqual(job_dict[JOB_TYPE], PYTHON_TYPE) self.assertIn(JOB_PATTERN, job_dict) - self.assertEqual(job_dict[JOB_PATTERN], pattern) + self.assertEqual(job_dict[JOB_PATTERN], pattern.name) self.assertIn(JOB_RECIPE, job_dict) - self.assertEqual(job_dict[JOB_RECIPE], recipe) + self.assertEqual(job_dict[JOB_RECIPE], recipe.name) self.assertIn(JOB_RULE, job_dict) self.assertEqual(job_dict[JOB_RULE], rule.name) self.assertIn(JOB_STATUS, job_dict) diff --git a/tests/test_recipes.py b/tests/test_recipes.py index 6111eef..454bdfc 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -12,7 +12,7 @@ from core.correctness.vars import EVENT_TYPE, WATCHDOG_BASE, WATCHDOG_RULE, \ RESULT_FILE from core.correctness.validation import valid_job from core.functionality import get_file_hash, create_job, create_event, \ - make_dir, write_yaml, write_notebook + make_dir, write_yaml, write_notebook, read_yaml from core.meow import create_rules, create_rule from patterns.file_event_pattern import FileEventPattern, SWEEP_START, \ SWEEP_STOP, SWEEP_JUMP @@ -156,10 +156,12 @@ class CorrectnessTests(unittest.TestCase): ph.handle(event) if from_handler_reader.poll(3): - job = from_handler_reader.recv() + job_dir = from_handler_reader.recv() - self.assertIsNotNone(job[JOB_ID]) + self.assertIsInstance(job_dir, str) + self.assertTrue(os.path.exists(job_dir)) + job = read_yaml(os.path.join(job_dir, META_FILE)) valid_job(job) # Test PapermillHandler will create enough jobs from single sweep @@ -217,8 +219,13 @@ class CorrectnessTests(unittest.TestCase): values = [0, 1, 2] self.assertEqual(len(jobs), 3) - for job in jobs: + for job_dir in jobs: + self.assertIsInstance(job_dir, str) + self.assertTrue(os.path.exists(job_dir)) + + job = read_yaml(os.path.join(job_dir, META_FILE)) valid_job(job) + self.assertIn(JOB_PARAMETERS, job) self.assertIn("s", job[JOB_PARAMETERS]) if job[JOB_PARAMETERS]["s"] in values: @@ -291,8 +298,13 @@ class CorrectnessTests(unittest.TestCase): "s1-0/s2-80", "s1-1/s2-80", "s1-2/s2-80", ] self.assertEqual(len(jobs), 15) - for job in jobs: + for job_dir in jobs: + self.assertIsInstance(job_dir, str) + self.assertTrue(os.path.exists(job_dir)) + + job = read_yaml(os.path.join(job_dir, META_FILE)) valid_job(job) + self.assertIn(JOB_PARAMETERS, job) val1 = None val2 = None @@ -305,8 +317,6 @@ class CorrectnessTests(unittest.TestCase): val = f"{val1}/{val2}" if val and val in values: values.remove(val) - print([j[JOB_PARAMETERS] for j in jobs]) - print(values) self.assertEqual(len(values), 0) # Test jobFunc performs as expected diff --git a/tests/test_runner.py b/tests/test_runner.py index d3f0025..29b809e 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -359,3 +359,5 @@ class MeowTests(unittest.TestCase): # TODO sweep tests # TODO adding tests with numpy + # TODO test getting job cannot handle + # TODO test getting event cannot handle