diff --git a/conductors/local_bash_conductor.py b/conductors/local_bash_conductor.py index a821fb5..02f24c5 100644 --- a/conductors/local_bash_conductor.py +++ b/conductors/local_bash_conductor.py @@ -21,8 +21,8 @@ from meow_base.core.vars import DEFAULT_JOB_QUEUE_DIR, \ JOB_ERROR, JOB_TYPE, DEFAULT_JOB_QUEUE_DIR, STATUS_RUNNING, \ JOB_START_TIME, DEFAULT_JOB_OUTPUT_DIR, get_job_file from meow_base.functionality.validation import valid_dir_path -from meow_base.functionality.file_io import make_dir, read_yaml, write_file, \ - write_yaml +from meow_base.functionality.file_io import make_dir, write_file, \ + threadsafe_read_status, threadsafe_update_status class LocalBashConductor(BaseConductor): @@ -66,13 +66,17 @@ class LocalBashConductor(BaseConductor): abort = False try: meta_file = os.path.join(job_dir, META_FILE) - job = read_yaml(meta_file) + job = threadsafe_read_status(meta_file) valid_job(job) # update the status file with running status - job[JOB_STATUS] = STATUS_RUNNING - job[JOB_START_TIME] = datetime.now() - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_RUNNING, + JOB_START_TIME: datetime.now() + }, + meta_file + ) except Exception as e: # If something has gone wrong at this stage then its bad, so we @@ -92,40 +96,40 @@ class LocalBashConductor(BaseConductor): cwd="." ) - # get up to date job data - job = read_yaml(meta_file) - if result == 0: # Update the status file with the finalised status - job[JOB_STATUS] = STATUS_DONE - job[JOB_END_TIME] = datetime.now() - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_DONE, + JOB_END_TIME: datetime.now() + }, + meta_file + ) else: # Update the status file with the error status. Don't # overwrite any more specific error messages already # created - if JOB_STATUS not in job: - job[JOB_STATUS] = STATUS_FAILED - if JOB_END_TIME not in job: - job[JOB_END_TIME] = datetime.now() - if JOB_ERROR not in job: - job[JOB_ERROR] = f"Job execution returned non-zero." - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_FAILED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: "Job execution returned non-zero." + }, + meta_file + ) except Exception as e: - # get up to date job data - job = read_yaml(meta_file) - # Update the status file with the error status. Don't overwrite # any more specific error messages already created - if JOB_STATUS not in job: - job[JOB_STATUS] = STATUS_FAILED - if JOB_END_TIME not in job: - job[JOB_END_TIME] = datetime.now() - if JOB_ERROR not in job: - job[JOB_ERROR] = f"Job execution failed. {e}" - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_FAILED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: f"Job execution failed. {e}" + }, + meta_file + ) # Move the contents of the execution directory to the final output # directory. diff --git a/conductors/local_python_conductor.py b/conductors/local_python_conductor.py index 22cb8e9..5fed3a2 100644 --- a/conductors/local_python_conductor.py +++ b/conductors/local_python_conductor.py @@ -19,8 +19,8 @@ from meow_base.core.vars import JOB_TYPE_PYTHON, PYTHON_FUNC, \ JOB_ERROR, JOB_TYPE, JOB_TYPE_PAPERMILL, DEFAULT_JOB_QUEUE_DIR, \ DEFAULT_JOB_OUTPUT_DIR from meow_base.functionality.validation import valid_dir_path -from meow_base.functionality.file_io import make_dir, read_yaml, write_file, \ - write_yaml +from meow_base.functionality.file_io import make_dir, write_file, \ + threadsafe_read_status, threadsafe_update_status class LocalPythonConductor(BaseConductor): def __init__(self, job_queue_dir:str=DEFAULT_JOB_QUEUE_DIR, @@ -63,13 +63,17 @@ class LocalPythonConductor(BaseConductor): abort = False try: meta_file = os.path.join(job_dir, META_FILE) - job = read_yaml(meta_file) + job = threadsafe_read_status(meta_file) valid_job(job) # update the status file with running status - job[JOB_STATUS] = STATUS_RUNNING - job[JOB_START_TIME] = datetime.now() - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_RUNNING, + JOB_START_TIME: datetime.now() + }, + meta_file + ) except Exception as e: # If something has gone wrong at this stage then its bad, so we @@ -84,27 +88,27 @@ class LocalPythonConductor(BaseConductor): job_function = job[PYTHON_FUNC] job_function(job_dir) - # get up to date job data - job = read_yaml(meta_file) - # Update the status file with the finalised status - job[JOB_STATUS] = STATUS_DONE - job[JOB_END_TIME] = datetime.now() - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_DONE, + JOB_END_TIME: datetime.now() + }, + meta_file + ) except Exception as e: - # get up to date job data - job = read_yaml(meta_file) - # Update the status file with the error status. Don't overwrite # any more specific error messages already created - if JOB_STATUS not in job: - job[JOB_STATUS] = STATUS_FAILED - if JOB_END_TIME not in job: - job[JOB_END_TIME] = datetime.now() - if JOB_ERROR not in job: - job[JOB_ERROR] = f"Job execution failed. {e}" - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_FAILED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: f"Job execution failed. {e}" + }, + meta_file + ) + # Move the contents of the execution directory to the final output # directory. diff --git a/core/runner.py b/core/runner.py index 0e68d59..bdec074 100644 --- a/core/runner.py +++ b/core/runner.py @@ -23,7 +23,7 @@ from meow_base.core.vars import DEBUG_WARNING, DEBUG_INFO, \ from meow_base.functionality.validation import check_type, valid_list, \ valid_dir_path from meow_base.functionality.debug import setup_debugging, print_debug -from meow_base.functionality.file_io import make_dir, read_yaml +from meow_base.functionality.file_io import make_dir, threadsafe_read_status from meow_base.functionality.process_io import wait @@ -177,7 +177,7 @@ class MeowRunner: job_dir = from_handler.recv() try: metafile = os.path.join(job_dir, META_FILE) - job = read_yaml(metafile) + job = threadsafe_read_status(metafile) except Exception as e: print_debug(self._print_target, self.debug_level, "Could not load necessary job definitions for " diff --git a/core/vars.py b/core/vars.py index a078f2f..ac5cc56 100644 --- a/core/vars.py +++ b/core/vars.py @@ -145,6 +145,9 @@ DEBUG_ERROR = 1 DEBUG_WARNING = 2 DEBUG_INFO = 3 +# Locking +LOCK_EXT = ".lock" + # debug message functions def get_drt_imp_msg(base_class): return f"{base_class.__name__} may not be instantiated directly. " \ diff --git a/functionality/file_io.py b/functionality/file_io.py index 7fd627e..6f21ec6 100644 --- a/functionality/file_io.py +++ b/functionality/file_io.py @@ -13,7 +13,8 @@ from os.path import exists, isfile, join from typing import Any, Dict, List from meow_base.core.vars import JOB_END_TIME, JOB_ERROR, JOB_STATUS, \ - STATUS_FAILED, STATUS_DONE, JOB_CREATE_TIME, JOB_START_TIME + STATUS_FAILED, STATUS_DONE, JOB_CREATE_TIME, JOB_START_TIME, \ + STATUS_SKIPPED, LOCK_EXT from meow_base.functionality.validation import valid_path @@ -98,7 +99,7 @@ def write_yaml(source:Any, filename:str): yaml.dump(source, param_file, default_flow_style=False) def threadsafe_read_status(filepath:str): - lock_path = f"{filepath}.lock" + lock_path = filepath + LOCK_EXT lock_handle = open(lock_path, 'a') fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX) @@ -113,7 +114,7 @@ def threadsafe_read_status(filepath:str): return status def threadsafe_write_status(source:dict[str,Any], filepath:str): - lock_path = f"{filepath}.lock" + lock_path = filepath + LOCK_EXT lock_handle = open(lock_path, 'a') fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX) @@ -126,7 +127,7 @@ def threadsafe_write_status(source:dict[str,Any], filepath:str): lock_handle.close() def threadsafe_update_status(updates:dict[str,Any], filepath:str): - lock_path = f"{filepath}.lock" + lock_path = filepath + LOCK_EXT lock_handle = open(lock_path, 'a') fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX) @@ -137,7 +138,7 @@ def threadsafe_update_status(updates:dict[str,Any], filepath:str): if k in updates: # Do not overwrite final job status if k == JOB_STATUS \ - and v in [STATUS_DONE, STATUS_FAILED]: + and v in [STATUS_DONE, STATUS_FAILED, STATUS_SKIPPED]: continue # Do not overwrite an existing time elif k in [JOB_START_TIME, JOB_CREATE_TIME, JOB_END_TIME]: @@ -147,6 +148,10 @@ def threadsafe_update_status(updates:dict[str,Any], filepath:str): updates[k] = f"{v} {updates[k]}" status[k] = updates[k] + + for k, v in updates.items(): + if k not in status: + status[k] = v write_yaml(status, filepath) except Exception as e: diff --git a/recipes/bash_recipe.py b/recipes/bash_recipe.py index 1e5f940..27c7ea1 100644 --- a/recipes/bash_recipe.py +++ b/recipes/bash_recipe.py @@ -12,12 +12,12 @@ from meow_base.functionality.validation import check_type, valid_dict, \ valid_string, valid_dir_path from meow_base.core.vars import DEBUG_INFO, DEFAULT_JOB_QUEUE_DIR, \ VALID_VARIABLE_NAME_CHARS, EVENT_PATH, EVENT_RULE, EVENT_TYPE, JOB_ID, \ - EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, JOB_PARAMETERS, WATCHDOG_HASH, \ - WATCHDOG_BASE, META_FILE, STATUS_QUEUED, JOB_STATUS, \ + EVENT_TYPE_WATCHDOG, JOB_TYPE_BASH, JOB_PARAMETERS, WATCHDOG_BASE, \ + META_FILE, STATUS_QUEUED, JOB_STATUS, \ get_base_file, get_job_file from meow_base.functionality.debug import setup_debugging, print_debug -from meow_base.functionality.file_io import valid_path, make_dir, write_yaml, \ - write_file, lines_to_string +from meow_base.functionality.file_io import valid_path, make_dir, write_file, \ + lines_to_string, threadsafe_write_status from meow_base.functionality.parameterisation import parameterize_bash_script from meow_base.functionality.meow import create_job, replace_keywords @@ -157,7 +157,7 @@ class BashHandler(BaseHandler): # write a status file to the job directory meta_file = os.path.join(job_dir, META_FILE) - write_yaml(meow_job, meta_file) + threadsafe_write_status(meow_job, meta_file) # parameterise recipe and write as executeable script base_script = parameterize_bash_script( @@ -177,7 +177,7 @@ class BashHandler(BaseHandler): meow_job[JOB_STATUS] = STATUS_QUEUED # update the status file with queued status - write_yaml(meow_job, meta_file) + threadsafe_write_status(meow_job, meta_file) # Send job directory, as actual definitons will be read from within it self.to_runner.send(job_dir) diff --git a/recipes/jupyter_notebook_recipe.py b/recipes/jupyter_notebook_recipe.py index 0db6f23..3e98a56 100644 --- a/recipes/jupyter_notebook_recipe.py +++ b/recipes/jupyter_notebook_recipe.py @@ -24,7 +24,8 @@ from meow_base.core.vars import VALID_VARIABLE_NAME_CHARS, \ get_base_file from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.file_io import make_dir, read_notebook, \ - write_notebook, write_yaml + write_notebook, write_yaml, threadsafe_write_status, \ + threadsafe_update_status from meow_base.functionality.meow import create_job, replace_keywords @@ -163,7 +164,7 @@ class PapermillHandler(BaseHandler): # write a status file to the job directory meta_file = os.path.join(job_dir, META_FILE) - write_yaml(meow_job, meta_file) + threadsafe_write_status(meow_job, meta_file) # write an executable notebook to the job directory base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PAPERMILL)) @@ -176,7 +177,12 @@ class PapermillHandler(BaseHandler): meow_job[JOB_STATUS] = STATUS_QUEUED # update the status file with queued status - write_yaml(meow_job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_QUEUED + }, + meta_file + ) # Send job directory, as actual definitons will be read from within it self.to_runner.send(job_dir) @@ -208,7 +214,8 @@ def papermill_job_func(job_dir): JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ JOB_ERROR, STATUS_FAILED, get_job_file, \ get_result_file - from meow_base.functionality.file_io import read_yaml, write_notebook, write_yaml + from meow_base.functionality.file_io import read_yaml, write_notebook, \ + threadsafe_read_status, threadsafe_update_status from meow_base.functionality.hashing import get_hash from meow_base.functionality.parameterisation import parameterize_jupyter_notebook @@ -221,7 +228,7 @@ def papermill_job_func(job_dir): param_file = os.path.join(job_dir, PARAMS_FILE) # Get job defintions - job = read_yaml(meta_file) + job = threadsafe_read_status(meta_file) yaml_dict = read_yaml(param_file) # Check the hash of the triggering file, if present. This addresses @@ -234,15 +241,20 @@ def papermill_job_func(job_dir): # another job will have been scheduled anyway. if not triggerfile_hash \ or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]: - job[JOB_STATUS] = STATUS_SKIPPED - job[JOB_END_TIME] = datetime.now() msg = "Job was skipped as triggering file " + \ f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \ "scheduling. Was expected to have hash " + \ f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \ f"{triggerfile_hash}'." - job[JOB_ERROR] = msg - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_SKIPPED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: msg + }, + meta_file + ) + return # Create a parameterised version of the executable notebook @@ -253,20 +265,29 @@ def papermill_job_func(job_dir): ) write_notebook(job_notebook, job_file) except Exception as e: - job[JOB_STATUS] = STATUS_FAILED - job[JOB_END_TIME] = datetime.now() msg = f"Job file {job[JOB_ID]} was not created successfully. {e}" - job[JOB_ERROR] = msg - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_FAILED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: msg + }, + meta_file + ) return # Execute the parameterised notebook try: papermill.execute_notebook(job_file, result_file, {}) except Exception as e: - job[JOB_STATUS] = STATUS_FAILED - job[JOB_END_TIME] = datetime.now() msg = f"Result file {result_file} was not created successfully. {e}" - job[JOB_ERROR] = msg - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_FAILED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: msg + }, + meta_file + ) + return diff --git a/recipes/python_recipe.py b/recipes/python_recipe.py index b0889b1..03932a5 100644 --- a/recipes/python_recipe.py +++ b/recipes/python_recipe.py @@ -23,7 +23,8 @@ from meow_base.core.vars import VALID_VARIABLE_NAME_CHARS, \ get_base_file from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.functionality.file_io import make_dir, read_file_lines, \ - write_file, write_yaml, lines_to_string + write_file, write_yaml, lines_to_string, threadsafe_write_status, \ + threadsafe_update_status from meow_base.functionality.meow import create_job, replace_keywords @@ -153,7 +154,7 @@ class PythonHandler(BaseHandler): # write a status file to the job directory meta_file = os.path.join(job_dir, META_FILE) - write_yaml(meow_job, meta_file) + threadsafe_write_status(meow_job, meta_file) # write an executable script to the job directory base_file = os.path.join(job_dir, get_base_file(JOB_TYPE_PYTHON)) @@ -163,10 +164,13 @@ class PythonHandler(BaseHandler): param_file = os.path.join(job_dir, PARAMS_FILE) write_yaml(yaml_dict, param_file) - meow_job[JOB_STATUS] = STATUS_QUEUED - # update the status file with queued status - write_yaml(meow_job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_QUEUED + }, + meta_file + ) # Send job directory, as actual definitons will be read from within it self.to_runner.send(job_dir) @@ -184,7 +188,8 @@ def python_job_func(job_dir): JOB_STATUS, SHA256, STATUS_SKIPPED, JOB_END_TIME, \ JOB_ERROR, STATUS_FAILED, get_base_file, \ get_job_file, get_result_file - from meow_base.functionality.file_io import read_yaml, write_yaml + from meow_base.functionality.file_io import read_yaml, \ + threadsafe_read_status, threadsafe_update_status from meow_base.functionality.hashing import get_hash from meow_base.functionality.parameterisation import parameterize_python_script @@ -196,7 +201,7 @@ def python_job_func(job_dir): param_file = os.path.join(job_dir, PARAMS_FILE) # Get job defintions - job = read_yaml(meta_file) + job = threadsafe_read_status(meta_file) yaml_dict = read_yaml(param_file) # Check the hash of the triggering file, if present. This addresses @@ -209,15 +214,19 @@ def python_job_func(job_dir): # another job will have been scheduled anyway. if not triggerfile_hash \ or triggerfile_hash != job[JOB_EVENT][WATCHDOG_HASH]: - job[JOB_STATUS] = STATUS_SKIPPED - job[JOB_END_TIME] = datetime.now() msg = "Job was skipped as triggering file " + \ f"'{job[JOB_EVENT][EVENT_PATH]}' has been modified since " + \ "scheduling. Was expected to have hash " + \ f"'{job[JOB_EVENT][WATCHDOG_HASH]}' but has '" + \ f"{triggerfile_hash}'." - job[JOB_ERROR] = msg - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_SKIPPED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: msg + }, + meta_file + ) return # Create a parameterised version of the executable script @@ -228,11 +237,15 @@ def python_job_func(job_dir): ) write_file(lines_to_string(job_script), job_file) except Exception as e: - job[JOB_STATUS] = STATUS_FAILED - job[JOB_END_TIME] = datetime.now() msg = f"Job file {job[JOB_ID]} was not created successfully. {e}" - job[JOB_ERROR] = msg - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_FAILED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: msg + }, + meta_file + ) return # Execute the parameterised script @@ -256,11 +269,16 @@ def python_job_func(job_dir): sys.stdout = std_stdout sys.stderr = std_stderr - job[JOB_STATUS] = STATUS_FAILED - job[JOB_END_TIME] = datetime.now() msg = f"Result file {result_file} was not created successfully. {e}" - job[JOB_ERROR] = msg - write_yaml(job, meta_file) + threadsafe_update_status( + { + JOB_STATUS: STATUS_FAILED, + JOB_END_TIME: datetime.now(), + JOB_ERROR: msg + }, + meta_file + ) + return sys.stdout = std_stdout diff --git a/tests/shared.py b/tests/shared.py index 89cfffa..9716d02 100644 --- a/tests/shared.py +++ b/tests/shared.py @@ -6,9 +6,10 @@ Author(s): David Marchant import os from distutils.dir_util import copy_tree +from typing import List from meow_base.core.vars import DEFAULT_JOB_OUTPUT_DIR, \ - DEFAULT_JOB_QUEUE_DIR + DEFAULT_JOB_QUEUE_DIR, LOCK_EXT from meow_base.functionality.file_io import make_dir, rmtree from meow_base.patterns.file_event_pattern import FileEventPattern from meow_base.recipes.jupyter_notebook_recipe import JupyterNotebookRecipe @@ -37,7 +38,9 @@ def teardown(): rmtree(DEFAULT_JOB_QUEUE_DIR) rmtree("first") for f in [ - "temp_phantom_info.h5", "temp_phantom.h5", "doesNotExist.lock" + "temp_phantom_info.h5", + "temp_phantom.h5", + f"doesNotExist{LOCK_EXT}" ]: if os.path.exists(f): os.remove(f) @@ -46,6 +49,14 @@ def backup_before_teardown(backup_source:str, backup_dest:str): make_dir(backup_dest, ensure_clean=True) copy_tree(backup_source, backup_dest) +# Necessary, as the creation of locks is not deterministic +def list_non_locks(dir:str)->List[str]: + return [f for f in os.listdir(dir) if not f.endswith(LOCK_EXT)] + +# Necessary, as the creation of locks is not deterministic +def count_non_locks(dir:str)->int: + return len(list_non_locks(dir)) + # Bash scripts BAREBONES_BASH_SCRIPT = [ diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 746ba7a..aeac077 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -13,7 +13,7 @@ from typing import Dict from meow_base.core.rule import Rule from meow_base.core.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \ - SHA256, EVENT_TYPE, EVENT_PATH, EVENT_TYPE_WATCHDOG, \ + SHA256, EVENT_TYPE, EVENT_PATH, EVENT_TYPE_WATCHDOG, LOCK_EXT, \ WATCHDOG_BASE, WATCHDOG_HASH, EVENT_RULE, JOB_PARAMETERS, \ PYTHON_FUNC, JOB_ID, JOB_EVENT, JOB_ERROR, STATUS_DONE, \ JOB_TYPE, JOB_PATTERN, JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, \ @@ -348,7 +348,7 @@ data""" self.assertFalse(os.path.exists(filepath)) threadsafe_write_status(first_yaml_dict, filepath) self.assertTrue(os.path.exists(filepath)) - self.assertTrue(os.path.exists(f"{filepath}.lock")) + self.assertTrue(os.path.exists(filepath + LOCK_EXT)) with open(filepath, 'r') as f: data = f.readlines() @@ -381,7 +381,7 @@ data""" threadsafe_write_status(second_yaml_dict, filepath) self.assertTrue(os.path.exists(filepath)) - self.assertTrue(os.path.exists(f"{filepath}.lock")) + self.assertTrue(os.path.exists(filepath + LOCK_EXT)) with open(filepath, 'r') as f: data = f.readlines() @@ -444,7 +444,7 @@ data""" self.assertFalse(os.path.exists(filepath)) threadsafe_write_status(first_yaml_dict, filepath) self.assertTrue(os.path.exists(filepath)) - self.assertTrue(os.path.exists(f"{filepath}.lock")) + self.assertTrue(os.path.exists(filepath + LOCK_EXT)) with open(filepath, 'r') as f: data = f.readlines() @@ -475,7 +475,7 @@ data""" threadsafe_update_status(second_yaml_dict, filepath) self.assertTrue(os.path.exists(filepath)) - self.assertTrue(os.path.exists(f"{filepath}.lock")) + self.assertTrue(os.path.exists(filepath + LOCK_EXT)) with open(filepath, 'r') as f: data = f.readlines() @@ -498,7 +498,8 @@ data""" JOB_CREATE_TIME: "now", JOB_STATUS: "Wham", JOB_ERROR: "first error.", - JOB_ID: "id" + JOB_ID: "id", + JOB_TYPE: "type" } filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml") @@ -506,7 +507,7 @@ data""" self.assertFalse(os.path.exists(filepath)) threadsafe_write_status(first_yaml_dict, filepath) self.assertTrue(os.path.exists(filepath)) - self.assertTrue(os.path.exists(f"{filepath}.lock")) + self.assertTrue(os.path.exists(filepath + LOCK_EXT)) status = threadsafe_read_status(filepath) @@ -523,7 +524,7 @@ data""" threadsafe_update_status(second_yaml_dict, filepath) self.assertTrue(os.path.exists(filepath)) - self.assertTrue(os.path.exists(f"{filepath}.lock")) + self.assertTrue(os.path.exists(filepath + LOCK_EXT)) status = threadsafe_read_status(filepath) @@ -531,7 +532,8 @@ data""" JOB_CREATE_TIME: "now", JOB_STATUS: STATUS_DONE, JOB_ERROR: "first error. changed.", - JOB_ID: "changed" + JOB_ID: "changed", + JOB_TYPE: "type" } self.assertEqual(expected_second_yaml_dict, status) @@ -540,14 +542,15 @@ data""" JOB_CREATE_TIME: "editted", JOB_STATUS: "editted", JOB_ERROR: "editted.", - JOB_ID: "editted" + JOB_ID: "editted", + "something_new": "new" } filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml") threadsafe_update_status(third_yaml_dict, filepath) self.assertTrue(os.path.exists(filepath)) - self.assertTrue(os.path.exists(f"{filepath}.lock")) + self.assertTrue(os.path.exists(filepath + LOCK_EXT)) status = threadsafe_read_status(filepath) @@ -555,8 +558,13 @@ data""" JOB_CREATE_TIME: "now", JOB_STATUS: STATUS_DONE, JOB_ERROR: "first error. changed. editted.", - JOB_ID: "editted" + JOB_ID: "editted", + JOB_TYPE: "type", + "something_new": "new" } + + print(expected_third_yaml_dict) + print(status) self.assertEqual(expected_third_yaml_dict, status) diff --git a/tests/test_runner.py b/tests/test_runner.py index 8545ae7..2148a4c 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -29,7 +29,8 @@ from shared import TEST_JOB_QUEUE, TEST_JOB_OUTPUT, TEST_MONITOR_BASE, \ MAKER_RECIPE, APPENDING_NOTEBOOK, COMPLETE_PYTHON_SCRIPT, TEST_DIR, \ FILTER_RECIPE, POROSITY_CHECK_NOTEBOOK, SEGMENT_FOAM_NOTEBOOK, \ GENERATOR_NOTEBOOK, FOAM_PORE_ANALYSIS_NOTEBOOK, IDMC_UTILS_PYTHON_SCRIPT, \ - TEST_DATA, GENERATE_PYTHON_SCRIPT, setup, teardown, backup_before_teardown + TEST_DATA, GENERATE_PYTHON_SCRIPT, \ + setup, teardown, backup_before_teardown, count_non_locks pattern_check = FileEventPattern( "pattern_check", @@ -329,7 +330,8 @@ class MeowTests(unittest.TestCase): runner.stop() job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) - self.assertEqual(len(os.listdir(job_dir)), 5) + print(os.listdir(job_dir)) + self.assertEqual(count_non_locks(job_dir), 5) result = read_notebook( os.path.join(job_dir, get_result_file(JOB_TYPE_PAPERMILL))) @@ -426,7 +428,7 @@ class MeowTests(unittest.TestCase): runner.stop() mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) - self.assertEqual(len(os.listdir(mid_job_dir)), 5) + self.assertEqual(count_non_locks(mid_job_dir), 5) result = read_notebook( os.path.join(mid_job_dir, get_result_file(JOB_TYPE_PAPERMILL))) @@ -441,7 +443,7 @@ class MeowTests(unittest.TestCase): self.assertEqual(data, "Initial Data\nA line from Pattern 1") final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id) - self.assertEqual(len(os.listdir(final_job_dir)), 5) + self.assertEqual(count_non_locks(final_job_dir), 5) result = read_notebook(os.path.join(final_job_dir, get_result_file(JOB_TYPE_PAPERMILL))) @@ -645,7 +647,7 @@ class MeowTests(unittest.TestCase): final_job_id = job_ids[0] mid_job_dir = os.path.join(TEST_JOB_OUTPUT, mid_job_id) - self.assertEqual(len(os.listdir(mid_job_dir)), 5) + self.assertEqual(count_non_locks(mid_job_dir), 5) mid_metafile = os.path.join(mid_job_dir, META_FILE) mid_status = read_yaml(mid_metafile) @@ -664,7 +666,7 @@ class MeowTests(unittest.TestCase): self.assertEqual(mid_output, "7806.25") final_job_dir = os.path.join(TEST_JOB_OUTPUT, final_job_id) - self.assertEqual(len(os.listdir(final_job_dir)), 5) + self.assertEqual(count_non_locks(final_job_dir), 5) final_metafile = os.path.join(final_job_dir, META_FILE) final_status = read_yaml(final_metafile)