integrated threadsafe status updates

This commit is contained in:
PatchOfScotland
2023-04-18 13:50:20 +02:00
parent 3f28b11be9
commit ddca1f6aa4
11 changed files with 196 additions and 120 deletions

View File

@ -21,8 +21,8 @@ from meow_base.core.vars import DEFAULT_JOB_QUEUE_DIR, \
JOB_ERROR, JOB_TYPE, DEFAULT_JOB_QUEUE_DIR, STATUS_RUNNING, \ JOB_ERROR, JOB_TYPE, DEFAULT_JOB_QUEUE_DIR, STATUS_RUNNING, \
JOB_START_TIME, DEFAULT_JOB_OUTPUT_DIR, get_job_file JOB_START_TIME, DEFAULT_JOB_OUTPUT_DIR, get_job_file
from meow_base.functionality.validation import valid_dir_path from meow_base.functionality.validation import valid_dir_path
from meow_base.functionality.file_io import make_dir, read_yaml, write_file, \ from meow_base.functionality.file_io import make_dir, write_file, \
write_yaml threadsafe_read_status, threadsafe_update_status
class LocalBashConductor(BaseConductor): class LocalBashConductor(BaseConductor):
@ -66,13 +66,17 @@ class LocalBashConductor(BaseConductor):
abort = False abort = False
try: try:
meta_file = os.path.join(job_dir, META_FILE) meta_file = os.path.join(job_dir, META_FILE)
job = read_yaml(meta_file) job = threadsafe_read_status(meta_file)
valid_job(job) valid_job(job)
# update the status file with running status # update the status file with running status
job[JOB_STATUS] = STATUS_RUNNING threadsafe_update_status(
job[JOB_START_TIME] = datetime.now() {
write_yaml(job, meta_file) JOB_STATUS: STATUS_RUNNING,
JOB_START_TIME: datetime.now()
},
meta_file
)
except Exception as e: except Exception as e:
# If something has gone wrong at this stage then its bad, so we # If something has gone wrong at this stage then its bad, so we
@ -92,40 +96,40 @@ class LocalBashConductor(BaseConductor):
cwd="." cwd="."
) )
# get up to date job data
job = read_yaml(meta_file)
if result == 0: if result == 0:
# Update the status file with the finalised status # Update the status file with the finalised status
job[JOB_STATUS] = STATUS_DONE threadsafe_update_status(
job[JOB_END_TIME] = datetime.now() {
write_yaml(job, meta_file) JOB_STATUS: STATUS_DONE,
JOB_END_TIME: datetime.now()
},
meta_file
)
else: else:
# Update the status file with the error status. Don't # Update the status file with the error status. Don't
# overwrite any more specific error messages already # overwrite any more specific error messages already
# created # created
if JOB_STATUS not in job: threadsafe_update_status(
job[JOB_STATUS] = STATUS_FAILED {
if JOB_END_TIME not in job: JOB_STATUS: STATUS_FAILED,
job[JOB_END_TIME] = datetime.now() JOB_END_TIME: datetime.now(),
if JOB_ERROR not in job: JOB_ERROR: "Job execution returned non-zero."
job[JOB_ERROR] = f"Job execution returned non-zero." },
write_yaml(job, meta_file) meta_file
)
except Exception as e: except Exception as e:
# get up to date job data
job = read_yaml(meta_file)
# Update the status file with the error status. Don't overwrite # Update the status file with the error status. Don't overwrite
# any more specific error messages already created # any more specific error messages already created
if JOB_STATUS not in job: threadsafe_update_status(
job[JOB_STATUS] = STATUS_FAILED {
if JOB_END_TIME not in job: JOB_STATUS: STATUS_FAILED,
job[JOB_END_TIME] = datetime.now() JOB_END_TIME: datetime.now(),
if JOB_ERROR not in job: JOB_ERROR: f"Job execution failed. {e}"
job[JOB_ERROR] = f"Job execution failed. {e}" },
write_yaml(job, meta_file) meta_file
)
# Move the contents of the execution directory to the final output # Move the contents of the execution directory to the final output
# directory. # directory.

View File

@ -19,8 +19,8 @@ from meow_base.core.vars import JOB_TYPE_PYTHON, PYTHON_FUNC, \
JOB_ERROR, JOB_TYPE, JOB_TYPE_PAPERMILL, DEFAULT_JOB_QUEUE_DIR, \ JOB_ERROR, JOB_TYPE, JOB_TYPE_PAPERMILL, DEFAULT_JOB_QUEUE_DIR, \
DEFAULT_JOB_OUTPUT_DIR DEFAULT_JOB_OUTPUT_DIR
from meow_base.functionality.validation import valid_dir_path from meow_base.functionality.validation import valid_dir_path
from meow_base.functionality.file_io import make_dir, read_yaml, write_file, \ from meow_base.functionality.file_io import make_dir, write_file, \
write_yaml threadsafe_read_status, threadsafe_update_status
class LocalPythonConductor(BaseConductor): class LocalPythonConductor(BaseConductor):
def __init__(self, job_queue_dir:str=DEFAULT_JOB_QUEUE_DIR, def __init__(self, job_queue_dir:str=DEFAULT_JOB_QUEUE_DIR,
@ -63,13 +63,17 @@ class LocalPythonConductor(BaseConductor):
abort = False abort = False
try: try:
meta_file = os.path.join(job_dir, META_FILE) meta_file = os.path.join(job_dir, META_FILE)
job = read_yaml(meta_file) job = threadsafe_read_status(meta_file)
valid_job(job) valid_job(job)
# update the status file with running status # update the status file with running status
job[JOB_STATUS] = STATUS_RUNNING threadsafe_update_status(
job[JOB_START_TIME] = datetime.now() {
write_yaml(job, meta_file) JOB_STATUS: STATUS_RUNNING,
JOB_START_TIME: datetime.now()
},
meta_file
)
except Exception as e: except Exception as e:
# If something has gone wrong at this stage then its bad, so we # If something has gone wrong at this stage then its bad, so we
@ -84,27 +88,27 @@ class LocalPythonConductor(BaseConductor):
job_function = job[PYTHON_FUNC] job_function = job[PYTHON_FUNC]
job_function(job_dir) job_function(job_dir)
# get up to date job data
job = read_yaml(meta_file)
# Update the status file with the finalised status # Update the status file with the finalised status
job[JOB_STATUS] = STATUS_DONE threadsafe_update_status(
job[JOB_END_TIME] = datetime.now() {
write_yaml(job, meta_file) JOB_STATUS: STATUS_DONE,
JOB_END_TIME: datetime.now()
},
meta_file
)
except Exception as e: except Exception as e:
# get up to date job data
job = read_yaml(meta_file)
# Update the status file with the error status. Don't overwrite # Update the status file with the error status. Don't overwrite
# any more specific error messages already created # any more specific error messages already created
if JOB_STATUS not in job: threadsafe_update_status(
job[JOB_STATUS] = STATUS_FAILED {
if JOB_END_TIME not in job: JOB_STATUS: STATUS_FAILED,
job[JOB_END_TIME] = datetime.now() JOB_END_TIME: datetime.now(),
if JOB_ERROR not in job: JOB_ERROR: f"Job execution failed. {e}"
job[JOB_ERROR] = f"Job execution failed. {e}" },
write_yaml(job, meta_file) meta_file
)
# Move the contents of the execution directory to the final output # Move the contents of the execution directory to the final output
# directory. # directory.

View File

@ -23,7 +23,7 @@ from meow_base.core.vars import DEBUG_WARNING, DEBUG_INFO, \
from meow_base.functionality.validation import check_type, valid_list, \ from meow_base.functionality.validation import check_type, valid_list, \
valid_dir_path valid_dir_path
from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.debug import setup_debugging, print_debug
from meow_base.functionality.file_io import make_dir, read_yaml from meow_base.functionality.file_io import make_dir, threadsafe_read_status
from meow_base.functionality.process_io import wait from meow_base.functionality.process_io import wait
@ -177,7 +177,7 @@ class MeowRunner:
job_dir = from_handler.recv() job_dir = from_handler.recv()
try: try:
metafile = os.path.join(job_dir, META_FILE) metafile = os.path.join(job_dir, META_FILE)
job = read_yaml(metafile) job = threadsafe_read_status(metafile)
except Exception as e: except Exception as e:
print_debug(self._print_target, self.debug_level, print_debug(self._print_target, self.debug_level,
"Could not load necessary job definitions for " "Could not load necessary job definitions for "

View File

@ -145,6 +145,9 @@ DEBUG_ERROR = 1
DEBUG_WARNING = 2 DEBUG_WARNING = 2
DEBUG_INFO = 3 DEBUG_INFO = 3
# Locking
LOCK_EXT = ".lock"
# debug message functions # debug message functions
def get_drt_imp_msg(base_class): def get_drt_imp_msg(base_class):
return f"{base_class.__name__} may not be instantiated directly. " \ return f"{base_class.__name__} may not be instantiated directly. " \

View File

@ -13,7 +13,8 @@ from os.path import exists, isfile, join
from typing import Any, Dict, List from typing import Any, Dict, List
from meow_base.core.vars import JOB_END_TIME, JOB_ERROR, JOB_STATUS, \ from meow_base.core.vars import JOB_END_TIME, JOB_ERROR, JOB_STATUS, \
STATUS_FAILED, STATUS_DONE, JOB_CREATE_TIME, JOB_START_TIME STATUS_FAILED, STATUS_DONE, JOB_CREATE_TIME, JOB_START_TIME, \
STATUS_SKIPPED, LOCK_EXT
from meow_base.functionality.validation import valid_path from meow_base.functionality.validation import valid_path
@ -98,7 +99,7 @@ def write_yaml(source:Any, filename:str):
yaml.dump(source, param_file, default_flow_style=False) yaml.dump(source, param_file, default_flow_style=False)
def threadsafe_read_status(filepath:str): def threadsafe_read_status(filepath:str):
lock_path = f"{filepath}.lock" lock_path = filepath + LOCK_EXT
lock_handle = open(lock_path, 'a') lock_handle = open(lock_path, 'a')
fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX) fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX)
@ -113,7 +114,7 @@ def threadsafe_read_status(filepath:str):
return status return status
def threadsafe_write_status(source:dict[str,Any], filepath:str): def threadsafe_write_status(source:dict[str,Any], filepath:str):
lock_path = f"{filepath}.lock" lock_path = filepath + LOCK_EXT
lock_handle = open(lock_path, 'a') lock_handle = open(lock_path, 'a')
fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX) fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX)
@ -126,7 +127,7 @@ def threadsafe_write_status(source:dict[str,Any], filepath:str):
lock_handle.close() lock_handle.close()
def threadsafe_update_status(updates:dict[str,Any], filepath:str): def threadsafe_update_status(updates:dict[str,Any], filepath:str):
lock_path = f"{filepath}.lock" lock_path = filepath + LOCK_EXT
lock_handle = open(lock_path, 'a') lock_handle = open(lock_path, 'a')
fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX) fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX)
@ -137,7 +138,7 @@ def threadsafe_update_status(updates:dict[str,Any], filepath:str):
if k in updates: if k in updates:
# Do not overwrite final job status # Do not overwrite final job status
if k == JOB_STATUS \ if k == JOB_STATUS \
and v in [STATUS_DONE, STATUS_FAILED]: and v in [STATUS_DONE, STATUS_FAILED, STATUS_SKIPPED]:
continue continue
# Do not overwrite an existing time # Do not overwrite an existing time
elif k in [JOB_START_TIME, JOB_CREATE_TIME, JOB_END_TIME]: elif k in [JOB_START_TIME, JOB_CREATE_TIME, JOB_END_TIME]:
@ -148,6 +149,10 @@ def threadsafe_update_status(updates:dict[str,Any], filepath:str):
status[k] = updates[k] status[k] = updates[k]
for k, v in updates.items():
if k not in status:
status[k] = v
write_yaml(status, filepath) write_yaml(status, filepath)
except Exception as e: except Exception as e:
lock_handle.close() lock_handle.close()

View File

@ -12,12 +12,12 @@ from meow_base.functionality.validation import check_type, valid_dict, \
valid_string, valid_dir_path valid_string, valid_dir_path
from meow_base.core.vars import DEBUG_INFO, DEFAULT_JOB_QUEUE_DIR, \ from meow_base.core.vars import DEBUG_INFO, DEFAULT_JOB_QUEUE_DIR, \
VALID_VARIABLE_NAME_CHARS, EVENT_PATH, EVENT_RULE, EVENT_TYPE, JOB_ID, \ VALID_VARIABLE_NAME_CHARS, EVENT_PATH, EVENT_RULE, EVENT_TYPE, JOB_ID, \
EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, JOB_PARAMETERS, WATCHDOG_HASH, \ EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, JOB_PARAMETERS, WATCHDOG_BASE, \
WATCHDOG_BASE, META_FILE, STATUS_QUEUED, JOB_STATUS, \ META_FILE, STATUS_QUEUED, JOB_STATUS, \
get_base_file, get_job_file get_base_file, get_job_file
from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.debug import setup_debugging, print_debug
from meow_base.functionality.file_io import valid_path, make_dir, write_yaml, \ from meow_base.functionality.file_io import valid_path, make_dir, write_file, \
write_file, lines_to_string lines_to_string, threadsafe_write_status
from meow_base.functionality.parameterisation import parameterize_bash_script from meow_base.functionality.parameterisation import parameterize_bash_script
from meow_base.functionality.meow import create_job, replace_keywords from meow_base.functionality.meow import create_job, replace_keywords
@ -157,7 +157,7 @@ class BashHandler(BaseHandler):
# write a status file to the job directory # write a status file to the job directory
meta_file = os.path.join(job_dir, META_FILE) meta_file = os.path.join(job_dir, META_FILE)
write_yaml(meow_job, meta_file) threadsafe_write_status(meow_job, meta_file)
# parameterise recipe and write as executeable script # parameterise recipe and write as executeable script
base_script = parameterize_bash_script( base_script = parameterize_bash_script(
@ -177,7 +177,7 @@ class BashHandler(BaseHandler):
meow_job[JOB_STATUS] = STATUS_QUEUED meow_job[JOB_STATUS] = STATUS_QUEUED
# update the status file with queued status # update the status file with queued status
write_yaml(meow_job, meta_file) threadsafe_write_status(meow_job, meta_file)
# Send job directory, as actual definitons will be read from within it # Send job directory, as actual definitons will be read from within it
self.to_runner.send(job_dir) self.to_runner.send(job_dir)

View File

@ -24,7 +24,8 @@ from meow_base.core.vars import VALID_VARIABLE_NAME_CHARS, \
get_base_file get_base_file
from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.debug import setup_debugging, print_debug
from meow_base.functionality.file_io import make_dir, read_notebook, \ from meow_base.functionality.file_io import make_dir, read_notebook, \
write_notebook, write_yaml write_notebook, write_yaml, threadsafe_write_status, \
threadsafe_update_status
from meow_base.functionality.meow import create_job, replace_keywords from meow_base.functionality.meow import create_job, replace_keywords
@ -163,7 +164,7 @@ class PapermillHandler(BaseHandler):
# write a status file to the job directory # write a status file to the job directory
meta_file = os.path.join(job_dir, META_FILE) meta_file = os.path.join(job_dir, META_FILE)
write_yaml(meow_job, meta_file) threadsafe_write_status(meow_job, meta_file)
# write an executable notebook to the job directory # write an executable notebook to the job directory
base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL)) base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL))
@ -176,7 +177,12 @@ class PapermillHandler(BaseHandler):
meow_job[JOB_STATUS] = STATUS_QUEUED meow_job[JOB_STATUS] = STATUS_QUEUED
# update the status file with queued status # update the status file with queued status
write_yaml(meow_job, meta_file) threadsafe_update_status(
{
JOB_STATUS: STATUS_QUEUED
},
meta_file
)
# Send job directory, as actual definitons will be read from within it # Send job directory, as actual definitons will be read from within it
self.to_runner.send(job_dir) self.to_runner.send(job_dir)
@ -208,7 +214,8 @@ def papermill_job_func(job_dir):
JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \
JOB_ERROR, STATUS_FAILED, get_job_file, \ JOB_ERROR, STATUS_FAILED, get_job_file, \
get_result_file get_result_file
from meow_base.functionality.file_io import read_yaml, write_notebook, write_yaml from meow_base.functionality.file_io import read_yaml, write_notebook, \
threadsafe_read_status, threadsafe_update_status
from meow_base.functionality.hashing import get_hash from meow_base.functionality.hashing import get_hash
from meow_base.functionality.parameterisation import parameterize_jupyter_notebook from meow_base.functionality.parameterisation import parameterize_jupyter_notebook
@ -221,7 +228,7 @@ def papermill_job_func(job_dir):
param_file = os.path.join(job_dir, PARAMS_FILE) param_file = os.path.join(job_dir, PARAMS_FILE)
# Get job defintions # Get job defintions
job = read_yaml(meta_file) job = threadsafe_read_status(meta_file)
yaml_dict = read_yaml(param_file) yaml_dict = read_yaml(param_file)
# Check the hash of the triggering file, if present. This addresses # Check the hash of the triggering file, if present. This addresses
@ -234,15 +241,20 @@ def papermill_job_func(job_dir):
# another job will have been scheduled anyway. # another job will have been scheduled anyway.
if not triggerfile_hash \ if not triggerfile_hash \
or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]: or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]:
job[JOB_STATUS] = STATUS_SKIPPED
job[JOB_END_TIME] = datetime.now()
msg = "Job was skipped as triggering file " + \ msg = "Job was skipped as triggering file " + \
f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \ f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \
"scheduling. Was expected to have hash " + \ "scheduling. Was expected to have hash " + \
f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \ f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \
f"{triggerfile_hash}'." f"{triggerfile_hash}'."
job[JOB_ERROR] = msg threadsafe_update_status(
write_yaml(job, meta_file) {
JOB_STATUS: STATUS_SKIPPED,
JOB_END_TIME: datetime.now(),
JOB_ERROR: msg
},
meta_file
)
return return
# Create a parameterised version of the executable notebook # Create a parameterised version of the executable notebook
@ -253,20 +265,29 @@ def papermill_job_func(job_dir):
) )
write_notebook(job_notebook, job_file) write_notebook(job_notebook, job_file)
except Exception as e: except Exception as e:
job[JOB_STATUS] = STATUS_FAILED
job[JOB_END_TIME] = datetime.now()
msg = f"Job file {job[JOB_ID]} was not created successfully. {e}" msg = f"Job file {job[JOB_ID]} was not created successfully. {e}"
job[JOB_ERROR] = msg threadsafe_update_status(
write_yaml(job, meta_file) {
JOB_STATUS: STATUS_FAILED,
JOB_END_TIME: datetime.now(),
JOB_ERROR: msg
},
meta_file
)
return return
# Execute the parameterised notebook # Execute the parameterised notebook
try: try:
papermill.execute_notebook(job_file, result_file, {}) papermill.execute_notebook(job_file, result_file, {})
except Exception as e: except Exception as e:
job[JOB_STATUS] = STATUS_FAILED
job[JOB_END_TIME] = datetime.now()
msg = f"Result file {result_file} was not created successfully. {e}" msg = f"Result file {result_file} was not created successfully. {e}"
job[JOB_ERROR] = msg threadsafe_update_status(
write_yaml(job, meta_file) {
JOB_STATUS: STATUS_FAILED,
JOB_END_TIME: datetime.now(),
JOB_ERROR: msg
},
meta_file
)
return return

View File

@ -23,7 +23,8 @@ from meow_base.core.vars import VALID_VARIABLE_NAME_CHARS, \
get_base_file get_base_file
from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.debug import setup_debugging, print_debug
from meow_base.functionality.file_io import make_dir, read_file_lines, \ from meow_base.functionality.file_io import make_dir, read_file_lines, \
write_file, write_yaml, lines_to_string write_file, write_yaml, lines_to_string, threadsafe_write_status, \
threadsafe_update_status
from meow_base.functionality.meow import create_job, replace_keywords from meow_base.functionality.meow import create_job, replace_keywords
@ -153,7 +154,7 @@ class PythonHandler(BaseHandler):
# write a status file to the job directory # write a status file to the job directory
meta_file = os.path.join(job_dir, META_FILE) meta_file = os.path.join(job_dir, META_FILE)
write_yaml(meow_job, meta_file) threadsafe_write_status(meow_job, meta_file)
# write an executable script to the job directory # write an executable script to the job directory
base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PYTHON)) base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PYTHON))
@ -163,10 +164,13 @@ class PythonHandler(BaseHandler):
param_file = os.path.join(job_dir, PARAMS_FILE) param_file = os.path.join(job_dir, PARAMS_FILE)
write_yaml(yaml_dict, param_file) write_yaml(yaml_dict, param_file)
meow_job[JOB_STATUS] = STATUS_QUEUED
# update the status file with queued status # update the status file with queued status
write_yaml(meow_job, meta_file) threadsafe_update_status(
{
JOB_STATUS: STATUS_QUEUED
},
meta_file
)
# Send job directory, as actual definitons will be read from within it # Send job directory, as actual definitons will be read from within it
self.to_runner.send(job_dir) self.to_runner.send(job_dir)
@ -184,7 +188,8 @@ def python_job_func(job_dir):
JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \
JOB_ERROR, STATUS_FAILED, get_base_file, \ JOB_ERROR, STATUS_FAILED, get_base_file, \
get_job_file, get_result_file get_job_file, get_result_file
from meow_base.functionality.file_io import read_yaml, write_yaml from meow_base.functionality.file_io import read_yaml, \
threadsafe_read_status, threadsafe_update_status
from meow_base.functionality.hashing import get_hash from meow_base.functionality.hashing import get_hash
from meow_base.functionality.parameterisation import parameterize_python_script from meow_base.functionality.parameterisation import parameterize_python_script
@ -196,7 +201,7 @@ def python_job_func(job_dir):
param_file = os.path.join(job_dir, PARAMS_FILE) param_file = os.path.join(job_dir, PARAMS_FILE)
# Get job defintions # Get job defintions
job = read_yaml(meta_file) job = threadsafe_read_status(meta_file)
yaml_dict = read_yaml(param_file) yaml_dict = read_yaml(param_file)
# Check the hash of the triggering file, if present. This addresses # Check the hash of the triggering file, if present. This addresses
@ -209,15 +214,19 @@ def python_job_func(job_dir):
# another job will have been scheduled anyway. # another job will have been scheduled anyway.
if not triggerfile_hash \ if not triggerfile_hash \
or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]: or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]:
job[JOB_STATUS] = STATUS_SKIPPED
job[JOB_END_TIME] = datetime.now()
msg = "Job was skipped as triggering file " + \ msg = "Job was skipped as triggering file " + \
f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \ f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \
"scheduling. Was expected to have hash " + \ "scheduling. Was expected to have hash " + \
f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \ f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \
f"{triggerfile_hash}'." f"{triggerfile_hash}'."
job[JOB_ERROR] = msg threadsafe_update_status(
write_yaml(job, meta_file) {
JOB_STATUS: STATUS_SKIPPED,
JOB_END_TIME: datetime.now(),
JOB_ERROR: msg
},
meta_file
)
return return
# Create a parameterised version of the executable script # Create a parameterised version of the executable script
@ -228,11 +237,15 @@ def python_job_func(job_dir):
) )
write_file(lines_to_string(job_script), job_file) write_file(lines_to_string(job_script), job_file)
except Exception as e: except Exception as e:
job[JOB_STATUS] = STATUS_FAILED
job[JOB_END_TIME] = datetime.now()
msg = f"Job file {job[JOB_ID]} was not created successfully. {e}" msg = f"Job file {job[JOB_ID]} was not created successfully. {e}"
job[JOB_ERROR] = msg threadsafe_update_status(
write_yaml(job, meta_file) {
JOB_STATUS: STATUS_FAILED,
JOB_END_TIME: datetime.now(),
JOB_ERROR: msg
},
meta_file
)
return return
# Execute the parameterised script # Execute the parameterised script
@ -256,11 +269,16 @@ def python_job_func(job_dir):
sys.stdout = std_stdout sys.stdout = std_stdout
sys.stderr = std_stderr sys.stderr = std_stderr
job[JOB_STATUS] = STATUS_FAILED
job[JOB_END_TIME] = datetime.now()
msg = f"Result file {result_file} was not created successfully. {e}" msg = f"Result file {result_file} was not created successfully. {e}"
job[JOB_ERROR] = msg threadsafe_update_status(
write_yaml(job, meta_file) {
JOB_STATUS: STATUS_FAILED,
JOB_END_TIME: datetime.now(),
JOB_ERROR: msg
},
meta_file
)
return return
sys.stdout = std_stdout sys.stdout = std_stdout

View File

@ -6,9 +6,10 @@ Author(s): David Marchant
import os import os
from distutils.dir_util import copy_tree from distutils.dir_util import copy_tree
from typing import List
from meow_base.core.vars import DEFAULT_JOB_OUTPUT_DIR, \ from meow_base.core.vars import DEFAULT_JOB_OUTPUT_DIR, \
DEFAULT_JOB_QUEUE_DIR DEFAULT_JOB_QUEUE_DIR, LOCK_EXT
from meow_base.functionality.file_io import make_dir, rmtree from meow_base.functionality.file_io import make_dir, rmtree
from meow_base.patterns.file_event_pattern import FileEventPattern from meow_base.patterns.file_event_pattern import FileEventPattern
from meow_base.recipes.jupyter_notebook_recipe import JupyterNotebookRecipe from meow_base.recipes.jupyter_notebook_recipe import JupyterNotebookRecipe
@ -37,7 +38,9 @@ def teardown():
rmtree(DEFAULT_JOB_QUEUE_DIR) rmtree(DEFAULT_JOB_QUEUE_DIR)
rmtree("first") rmtree("first")
for f in [ for f in [
"temp_phantom_info.h5", "temp_phantom.h5", "doesNotExist.lock" "temp_phantom_info.h5",
"temp_phantom.h5",
f"doesNotExist{LOCK_EXT}"
]: ]:
if os.path.exists(f): if os.path.exists(f):
os.remove(f) os.remove(f)
@ -46,6 +49,14 @@ def backup_before_teardown(backup_source:str, backup_dest:str):
make_dir(backup_dest, ensure_clean=True) make_dir(backup_dest, ensure_clean=True)
copy_tree(backup_source, backup_dest) copy_tree(backup_source, backup_dest)
# Necessary, as the creation of locks is not deterministic
def list_non_locks(dir:str)->List[str]:
return [f for f in os.listdir(dir) if not f.endswith(LOCK_EXT)]
# Necessary, as the creation of locks is not deterministic
def count_non_locks(dir:str)->int:
return len(list_non_locks(dir))
# Bash scripts # Bash scripts
BAREBONES_BASH_SCRIPT = [ BAREBONES_BASH_SCRIPT = [

View File

@ -13,7 +13,7 @@ from typing import Dict
from meow_base.core.rule import Rule from meow_base.core.rule import Rule
from meow_base.core.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ from meow_base.core.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \
SHA256, EVENT_TYPE, EVENT_PATH, EVENT_TYPE_WATCHDOG, \ SHA256, EVENT_TYPE, EVENT_PATH, EVENT_TYPE_WATCHDOG, LOCK_EXT, \
WATCHDOG_BASE, WATCHDOG_HASH, EVENT_RULE, JOB_PARAMETERS, \ WATCHDOG_BASE, WATCHDOG_HASH, EVENT_RULE, JOB_PARAMETERS, \
PYTHON_FUNC, JOB_ID, JOB_EVENT, JOB_ERROR, STATUS_DONE, \ PYTHON_FUNC, JOB_ID, JOB_EVENT, JOB_ERROR, STATUS_DONE, \
JOB_TYPE, JOB_PATTERN, JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, \ JOB_TYPE, JOB_PATTERN, JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, \
@ -348,7 +348,7 @@ data"""
self.assertFalse(os.path.exists(filepath)) self.assertFalse(os.path.exists(filepath))
threadsafe_write_status(first_yaml_dict, filepath) threadsafe_write_status(first_yaml_dict, filepath)
self.assertTrue(os.path.exists(filepath)) self.assertTrue(os.path.exists(filepath))
self.assertTrue(os.path.exists(f"{filepath}.lock")) self.assertTrue(os.path.exists(filepath + LOCK_EXT))
with open(filepath, 'r') as f: with open(filepath, 'r') as f:
data = f.readlines() data = f.readlines()
@ -381,7 +381,7 @@ data"""
threadsafe_write_status(second_yaml_dict, filepath) threadsafe_write_status(second_yaml_dict, filepath)
self.assertTrue(os.path.exists(filepath)) self.assertTrue(os.path.exists(filepath))
self.assertTrue(os.path.exists(f"{filepath}.lock")) self.assertTrue(os.path.exists(filepath + LOCK_EXT))
with open(filepath, 'r') as f: with open(filepath, 'r') as f:
data = f.readlines() data = f.readlines()
@ -444,7 +444,7 @@ data"""
self.assertFalse(os.path.exists(filepath)) self.assertFalse(os.path.exists(filepath))
threadsafe_write_status(first_yaml_dict, filepath) threadsafe_write_status(first_yaml_dict, filepath)
self.assertTrue(os.path.exists(filepath)) self.assertTrue(os.path.exists(filepath))
self.assertTrue(os.path.exists(f"{filepath}.lock")) self.assertTrue(os.path.exists(filepath + LOCK_EXT))
with open(filepath, 'r') as f: with open(filepath, 'r') as f:
data = f.readlines() data = f.readlines()
@ -475,7 +475,7 @@ data"""
threadsafe_update_status(second_yaml_dict, filepath) threadsafe_update_status(second_yaml_dict, filepath)
self.assertTrue(os.path.exists(filepath)) self.assertTrue(os.path.exists(filepath))
self.assertTrue(os.path.exists(f"{filepath}.lock")) self.assertTrue(os.path.exists(filepath + LOCK_EXT))
with open(filepath, 'r') as f: with open(filepath, 'r') as f:
data = f.readlines() data = f.readlines()
@ -498,7 +498,8 @@ data"""
JOB_CREATE_TIME: "now", JOB_CREATE_TIME: "now",
JOB_STATUS: "Wham", JOB_STATUS: "Wham",
JOB_ERROR: "first error.", JOB_ERROR: "first error.",
JOB_ID: "id" JOB_ID: "id",
JOB_TYPE: "type"
} }
filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml") filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml")
@ -506,7 +507,7 @@ data"""
self.assertFalse(os.path.exists(filepath)) self.assertFalse(os.path.exists(filepath))
threadsafe_write_status(first_yaml_dict, filepath) threadsafe_write_status(first_yaml_dict, filepath)
self.assertTrue(os.path.exists(filepath)) self.assertTrue(os.path.exists(filepath))
self.assertTrue(os.path.exists(f"{filepath}.lock")) self.assertTrue(os.path.exists(filepath + LOCK_EXT))
status = threadsafe_read_status(filepath) status = threadsafe_read_status(filepath)
@ -523,7 +524,7 @@ data"""
threadsafe_update_status(second_yaml_dict, filepath) threadsafe_update_status(second_yaml_dict, filepath)
self.assertTrue(os.path.exists(filepath)) self.assertTrue(os.path.exists(filepath))
self.assertTrue(os.path.exists(f"{filepath}.lock")) self.assertTrue(os.path.exists(filepath + LOCK_EXT))
status = threadsafe_read_status(filepath) status = threadsafe_read_status(filepath)
@ -531,7 +532,8 @@ data"""
JOB_CREATE_TIME: "now", JOB_CREATE_TIME: "now",
JOB_STATUS: STATUS_DONE, JOB_STATUS: STATUS_DONE,
JOB_ERROR: "first error. changed.", JOB_ERROR: "first error. changed.",
JOB_ID: "changed" JOB_ID: "changed",
JOB_TYPE: "type"
} }
self.assertEqual(expected_second_yaml_dict, status) self.assertEqual(expected_second_yaml_dict, status)
@ -540,14 +542,15 @@ data"""
JOB_CREATE_TIME: "editted", JOB_CREATE_TIME: "editted",
JOB_STATUS: "editted", JOB_STATUS: "editted",
JOB_ERROR: "editted.", JOB_ERROR: "editted.",
JOB_ID: "editted" JOB_ID: "editted",
"something_new": "new"
} }
filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml") filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml")
threadsafe_update_status(third_yaml_dict, filepath) threadsafe_update_status(third_yaml_dict, filepath)
self.assertTrue(os.path.exists(filepath)) self.assertTrue(os.path.exists(filepath))
self.assertTrue(os.path.exists(f"{filepath}.lock")) self.assertTrue(os.path.exists(filepath + LOCK_EXT))
status = threadsafe_read_status(filepath) status = threadsafe_read_status(filepath)
@ -555,9 +558,14 @@ data"""
JOB_CREATE_TIME: "now", JOB_CREATE_TIME: "now",
JOB_STATUS: STATUS_DONE, JOB_STATUS: STATUS_DONE,
JOB_ERROR: "first error. changed. editted.", JOB_ERROR: "first error. changed. editted.",
JOB_ID: "editted" JOB_ID: "editted",
JOB_TYPE: "type",
"something_new": "new"
} }
print(expected_third_yaml_dict)
print(status)
self.assertEqual(expected_third_yaml_dict, status) self.assertEqual(expected_third_yaml_dict, status)

View File

@ -29,7 +29,8 @@ from shared import TEST_JOB_QUEUE, TEST_JOB_OUTPUT, TEST_MONITOR_BASE, \
MAKER_RECIPE, APPENDING_NOTEBOOK, COMPLETE_PYTHON_SCRIPT, TEST_DIR, \ MAKER_RECIPE, APPENDING_NOTEBOOK, COMPLETE_PYTHON_SCRIPT, TEST_DIR, \
FILTER_RECIPE, POROSITY_CHECK_NOTEBOOK, SEGMENT_FOAM_NOTEBOOK, \ FILTER_RECIPE, POROSITY_CHECK_NOTEBOOK, SEGMENT_FOAM_NOTEBOOK, \
GENERATOR_NOTEBOOK, FOAM_PORE_ANALYSIS_NOTEBOOK, IDMC_UTILS_PYTHON_SCRIPT, \ GENERATOR_NOTEBOOK, FOAM_PORE_ANALYSIS_NOTEBOOK, IDMC_UTILS_PYTHON_SCRIPT, \
TEST_DATA, GENERATE_PYTHON_SCRIPT, setup, teardown, backup_before_teardown TEST_DATA, GENERATE_PYTHON_SCRIPT, \
setup, teardown, backup_before_teardown, count_non_locks
pattern_check = FileEventPattern( pattern_check = FileEventPattern(
"pattern_check", "pattern_check",
@ -329,7 +330,8 @@ class MeowTests(unittest.TestCase):
runner.stop() runner.stop()
job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
self.assertEqual(len(os.listdir(job_dir)), 5) print(os.listdir(job_dir))
self.assertEqual(count_non_locks(job_dir), 5)
result = read_notebook( result = read_notebook(
os.path.join(job_dir, get_result_file(JOB_TYPE_PAPERMILL))) os.path.join(job_dir, get_result_file(JOB_TYPE_PAPERMILL)))
@ -426,7 +428,7 @@ class MeowTests(unittest.TestCase):
runner.stop() runner.stop()
mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
self.assertEqual(len(os.listdir(mid_job_dir)), 5) self.assertEqual(count_non_locks(mid_job_dir), 5)
result = read_notebook( result = read_notebook(
os.path.join(mid_job_dir, get_result_file(JOB_TYPE_PAPERMILL))) os.path.join(mid_job_dir, get_result_file(JOB_TYPE_PAPERMILL)))
@ -441,7 +443,7 @@ class MeowTests(unittest.TestCase):
self.assertEqual(data, "Initial Data\nA line from Pattern 1") self.assertEqual(data, "Initial Data\nA line from Pattern 1")
final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
self.assertEqual(len(os.listdir(final_job_dir)), 5) self.assertEqual(count_non_locks(final_job_dir), 5)
result = read_notebook(os.path.join(final_job_dir, result = read_notebook(os.path.join(final_job_dir,
get_result_file(JOB_TYPE_PAPERMILL))) get_result_file(JOB_TYPE_PAPERMILL)))
@ -645,7 +647,7 @@ class MeowTests(unittest.TestCase):
final_job_id = job_ids[0] final_job_id = job_ids[0]
mid_job_dir = os.path.join(TEST_JOB_OUTPUT, mid_job_id) mid_job_dir = os.path.join(TEST_JOB_OUTPUT, mid_job_id)
self.assertEqual(len(os.listdir(mid_job_dir)), 5) self.assertEqual(count_non_locks(mid_job_dir), 5)
mid_metafile = os.path.join(mid_job_dir, META_FILE) mid_metafile = os.path.join(mid_job_dir, META_FILE)
mid_status = read_yaml(mid_metafile) mid_status = read_yaml(mid_metafile)
@ -664,7 +666,7 @@ class MeowTests(unittest.TestCase):
self.assertEqual(mid_output, "7806.25") self.assertEqual(mid_output, "7806.25")
final_job_dir = os.path.join(TEST_JOB_OUTPUT, final_job_id) final_job_dir = os.path.join(TEST_JOB_OUTPUT, final_job_id)
self.assertEqual(len(os.listdir(final_job_dir)), 5) self.assertEqual(count_non_locks(final_job_dir), 5)
final_metafile = os.path.join(final_job_dir, META_FILE) final_metafile = os.path.join(final_job_dir, META_FILE)
final_status = read_yaml(final_metafile) final_status = read_yaml(final_metafile)