updated conductor so it updates job meta files with statues

This commit is contained in:
PatchOfScotland
2023-02-01 11:25:03 +01:00
parent d08b8ce8c4
commit deb24f33ff
3 changed files with 54 additions and 35 deletions

View File

@ -5,10 +5,17 @@ execute Python jobs on the local resource.
Author(s): David Marchant Author(s): David Marchant
""" """
import os
import shutil
from datetime import datetime
from typing import Any from typing import Any
from core.correctness.vars import PYTHON_TYPE, PYTHON_FUNC from core.correctness.vars import PYTHON_TYPE, PYTHON_FUNC, JOB_STATUS, \
STATUS_RUNNING, JOB_START_TIME, PYTHON_EXECUTION_BASE, JOB_ID, META_FILE, \
STATUS_DONE, JOB_END_TIME, STATUS_FAILED, JOB_ERROR, PYTHON_OUTPUT_DIR
from core.correctness.validation import valid_job from core.correctness.validation import valid_job
from core.functionality import read_yaml, write_yaml
from core.meow import BaseConductor from core.meow import BaseConductor
@ -19,11 +26,42 @@ class LocalPythonConductor(BaseConductor):
def valid_job_types(self)->list[str]: def valid_job_types(self)->list[str]:
return [PYTHON_TYPE] return [PYTHON_TYPE]
# TODO expand with more feedback
def execute(self, job:dict[str,Any])->None: def execute(self, job:dict[str,Any])->None:
valid_job(job) valid_job(job)
job_dir = os.path.join(job[PYTHON_EXECUTION_BASE], job[JOB_ID])
meta_file = os.path.join(job_dir, META_FILE)
# update the status file with running status
job[JOB_STATUS] = STATUS_RUNNING
job[JOB_START_TIME] = datetime.now()
write_yaml(job, meta_file)
# execute the job
try:
job_function = job[PYTHON_FUNC] job_function = job[PYTHON_FUNC]
job_function(job) job_function(job)
return # get up to date job data
job = read_yaml(meta_file)
# Update the status file with the finalised status
job[JOB_STATUS] = STATUS_DONE
job[JOB_END_TIME] = datetime.now()
write_yaml(job, meta_file)
except Exception as e:
# get up to date job data
job = read_yaml(meta_file)
# Update the status file with the error status
job[JOB_STATUS] = STATUS_FAILED
job[JOB_END_TIME] = datetime.now()
msg = f"Job execution failed. {e}"
job[JOB_ERROR] = msg
write_yaml(job, meta_file)
# Move the contents of the execution directory to the final output
# directory.
job_output_dir = os.path.join(job[PYTHON_OUTPUT_DIR], job[JOB_ID])
shutil.move(job_dir, job_output_dir)

View File

@ -140,7 +140,6 @@ class PapermillHandler(BaseHandler):
def setup_job(self, event:dict[str,Any], yaml_dict:dict[str,Any])->None: def setup_job(self, event:dict[str,Any], yaml_dict:dict[str,Any])->None:
"""Function to set up new job dict and send it to the runner to be """Function to set up new job dict and send it to the runner to be
executed.""" executed."""
# TODO finish me so execution completed in conductor
meow_job = create_job(PYTHON_TYPE, event, { meow_job = create_job(PYTHON_TYPE, event, {
JOB_PARAMETERS:yaml_dict, JOB_PARAMETERS:yaml_dict,
JOB_HASH: event[WATCHDOG_HASH], JOB_HASH: event[WATCHDOG_HASH],
@ -187,16 +186,14 @@ class PapermillHandler(BaseHandler):
def job_func(job): def job_func(job):
# Requires own imports as will be run in its own execution environment # Requires own imports as will be run in its own execution environment
import os import os
import shutil
import papermill import papermill
from datetime import datetime from datetime import datetime
from core.functionality import write_yaml, read_yaml, write_notebook, \ from core.functionality import write_yaml, read_yaml, write_notebook, \
get_file_hash, parameterize_jupyter_notebook get_file_hash, parameterize_jupyter_notebook
from core.correctness.vars import JOB_EVENT, WATCHDOG_RULE, JOB_ID, \ from core.correctness.vars import JOB_EVENT, WATCHDOG_RULE, JOB_ID, \
EVENT_PATH, META_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE, \ EVENT_PATH, META_FILE, PARAMS_FILE, JOB_FILE, RESULT_FILE, \
JOB_STATUS, JOB_START_TIME, STATUS_RUNNING, JOB_HASH, SHA256, \ JOB_STATUS, JOB_HASH, SHA256, STATUS_SKIPPED, JOB_END_TIME, \
STATUS_SKIPPED, STATUS_DONE, JOB_END_TIME, JOB_ERROR, STATUS_FAILED, \ JOB_ERROR, STATUS_FAILED, PYTHON_EXECUTION_BASE
PYTHON_EXECUTION_BASE, PYTHON_OUTPUT_DIR
event = job[JOB_EVENT] event = job[JOB_EVENT]
@ -207,12 +204,6 @@ def job_func(job):
result_file = os.path.join(job_dir, RESULT_FILE) result_file = os.path.join(job_dir, RESULT_FILE)
param_file = os.path.join(job_dir, PARAMS_FILE) param_file = os.path.join(job_dir, PARAMS_FILE)
job[JOB_STATUS] = STATUS_RUNNING
job[JOB_START_TIME] = datetime.now()
# update the status file with running status
write_yaml(job, meta_file)
yaml_dict = read_yaml(param_file) yaml_dict = read_yaml(param_file)
# Check the hash of the triggering file, if present. This addresses # Check the hash of the triggering file, if present. This addresses
@ -259,13 +250,3 @@ def job_func(job):
job[JOB_ERROR] = msg job[JOB_ERROR] = msg
write_yaml(job, meta_file) write_yaml(job, meta_file)
return return
# Update the status file with the finalised status
job[JOB_STATUS] = STATUS_DONE
job[JOB_END_TIME] = datetime.now()
write_yaml(job, meta_file)
# Move the contents of the execution directory to the final output
# directory.
job_output_dir = os.path.join(job[PYTHON_OUTPUT_DIR], job[JOB_ID])
shutil.move(job_dir, job_output_dir)

View File

@ -363,6 +363,9 @@ class CorrectnessTests(unittest.TestCase):
job_dict[PYTHON_EXECUTION_BASE], job_dict[JOB_ID]) job_dict[PYTHON_EXECUTION_BASE], job_dict[JOB_ID])
make_dir(job_dir) make_dir(job_dir)
meta_file = os.path.join(job_dir, META_FILE)
write_yaml(job_dict, meta_file)
param_file = os.path.join(job_dir, PARAMS_FILE) param_file = os.path.join(job_dir, PARAMS_FILE)
write_yaml(params_dict, param_file) write_yaml(params_dict, param_file)
@ -372,15 +375,12 @@ class CorrectnessTests(unittest.TestCase):
job_func(job_dict) job_func(job_dict)
job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID]) job_dir = os.path.join(TEST_HANDLER_BASE, job_dict[JOB_ID])
self.assertFalse(os.path.exists(job_dir)) self.assertTrue(os.path.exists(job_dir))
self.assertTrue(os.path.exists(os.path.join(job_dir, META_FILE)))
output_dir = os.path.join(TEST_JOB_OUTPUT, job_dict[JOB_ID]) self.assertTrue(os.path.exists(os.path.join(job_dir, BASE_FILE)))
self.assertTrue(os.path.exists(output_dir)) self.assertTrue(os.path.exists(os.path.join(job_dir, PARAMS_FILE)))
self.assertTrue(os.path.exists(os.path.join(output_dir, META_FILE))) self.assertTrue(os.path.exists(os.path.join(job_dir, JOB_FILE)))
self.assertTrue(os.path.exists(os.path.join(output_dir, BASE_FILE))) self.assertTrue(os.path.exists(os.path.join(job_dir, RESULT_FILE)))
self.assertTrue(os.path.exists(os.path.join(output_dir, PARAMS_FILE)))
self.assertTrue(os.path.exists(os.path.join(output_dir, JOB_FILE)))
self.assertTrue(os.path.exists(os.path.join(output_dir, RESULT_FILE)))
self.assertTrue(os.path.exists(result_path)) self.assertTrue(os.path.exists(result_path))