added functions to update job status files in a threadsafe manner
This commit is contained in:
@ -4,6 +4,7 @@ This file contains functions for reading and writing different types of files.
|
||||
Author(s): David Marchant
|
||||
"""
|
||||
|
||||
import fcntl
|
||||
import json
|
||||
import yaml
|
||||
|
||||
@ -94,6 +95,51 @@ def write_yaml(source:Any, filename:str):
|
||||
with open(filename, 'w') as param_file:
|
||||
yaml.dump(source, param_file, default_flow_style=False)
|
||||
|
||||
# TODO test me
|
||||
def threadsafe_read_status(filepath:str):
|
||||
lock_path = f"{filepath}.lock"
|
||||
lock_handle = open(lock_path, 'a')
|
||||
fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX)
|
||||
|
||||
try:
|
||||
status = read_yaml(filepath)
|
||||
except Exception as e:
|
||||
lock_handle.close()
|
||||
raise e
|
||||
|
||||
lock_handle.close()
|
||||
|
||||
return status
|
||||
|
||||
# TODO test me
|
||||
def threadsafe_write_status(source:dict[str,Any], filepath:str):
|
||||
lock_path = f"{filepath}.lock"
|
||||
lock_handle = open(lock_path, 'a')
|
||||
fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX)
|
||||
|
||||
try:
|
||||
write_yaml(source, filepath)
|
||||
except Exception as e:
|
||||
lock_handle.close()
|
||||
raise e
|
||||
|
||||
lock_handle.close()
|
||||
|
||||
# TODO test me
|
||||
def threadsafe_update_status(updates:dict[str,Any], filepath:str):
|
||||
lock_path = f"{filepath}.lock"
|
||||
lock_handle = open(lock_path, 'a')
|
||||
fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX)
|
||||
|
||||
try:
|
||||
status = read_yaml(filepath)
|
||||
write_yaml({**status, **updates}, filepath)
|
||||
except Exception as e:
|
||||
lock_handle.close()
|
||||
raise e
|
||||
|
||||
lock_handle.close()
|
||||
|
||||
def read_notebook(filepath:str):
|
||||
valid_path(filepath, extension="ipynb")
|
||||
with open(filepath, 'r') as read_file:
|
||||
|
@ -36,10 +36,11 @@ def teardown():
|
||||
rmtree(DEFAULT_JOB_OUTPUT_DIR)
|
||||
rmtree(DEFAULT_JOB_QUEUE_DIR)
|
||||
rmtree("first")
|
||||
if os.path.exists("temp_phantom_info.h5"):
|
||||
os.remove("temp_phantom_info.h5")
|
||||
if os.path.exists("temp_phantom.h5"):
|
||||
os.remove("temp_phantom.h5")
|
||||
for f in [
|
||||
"temp_phantom_info.h5", "temp_phantom.h5", "doesNotExist.lock"
|
||||
]:
|
||||
if os.path.exists(f):
|
||||
os.remove(f)
|
||||
|
||||
def backup_before_teardown(backup_source:str, backup_dest:str):
|
||||
make_dir(backup_dest, ensure_clean=True)
|
||||
|
@ -21,7 +21,8 @@ from meow_base.core.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \
|
||||
from meow_base.functionality.debug import setup_debugging
|
||||
from meow_base.functionality.file_io import lines_to_string, make_dir, \
|
||||
read_file, read_file_lines, read_notebook, read_yaml, rmtree, write_file, \
|
||||
write_notebook, write_yaml
|
||||
write_notebook, write_yaml, threadsafe_read_status, \
|
||||
threadsafe_update_status, threadsafe_write_status
|
||||
from meow_base.functionality.hashing import get_hash
|
||||
from meow_base.functionality.meow import KEYWORD_BASE, KEYWORD_DIR, \
|
||||
KEYWORD_EXTENSION, KEYWORD_FILENAME, KEYWORD_JOB, KEYWORD_PATH, \
|
||||
@ -330,6 +331,167 @@ data"""
|
||||
|
||||
self.assertEqual(lines_to_string(l), "a\nb\nc")
|
||||
|
||||
def testThreadsafeWriteStatus(self)->None:
|
||||
first_yaml_dict = {
|
||||
"A": "a",
|
||||
"B": 1,
|
||||
"C": {
|
||||
"D": True,
|
||||
"E": [
|
||||
1, 2, 3
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml")
|
||||
|
||||
self.assertFalse(os.path.exists(filepath))
|
||||
threadsafe_write_status(first_yaml_dict, filepath)
|
||||
self.assertTrue(os.path.exists(filepath))
|
||||
self.assertTrue(os.path.exists(f"{filepath}.lock"))
|
||||
|
||||
with open(filepath, 'r') as f:
|
||||
data = f.readlines()
|
||||
|
||||
expected_bytes = [
|
||||
'A: a\n',
|
||||
'B: 1\n',
|
||||
'C:\n',
|
||||
' D: true\n',
|
||||
' E:\n',
|
||||
' - 1\n',
|
||||
' - 2\n',
|
||||
' - 3\n'
|
||||
]
|
||||
|
||||
self.assertEqual(data, expected_bytes)
|
||||
|
||||
second_yaml_dict = {
|
||||
"F": "a",
|
||||
"G": 1,
|
||||
"H": {
|
||||
"I": True,
|
||||
"J": [
|
||||
1, 2, 3
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml")
|
||||
|
||||
threadsafe_write_status(second_yaml_dict, filepath)
|
||||
self.assertTrue(os.path.exists(filepath))
|
||||
self.assertTrue(os.path.exists(f"{filepath}.lock"))
|
||||
|
||||
with open(filepath, 'r') as f:
|
||||
data = f.readlines()
|
||||
|
||||
expected_bytes = [
|
||||
'F: a\n',
|
||||
'G: 1\n',
|
||||
'H:\n',
|
||||
' I: true\n',
|
||||
' J:\n',
|
||||
' - 1\n',
|
||||
' - 2\n',
|
||||
' - 3\n'
|
||||
]
|
||||
|
||||
self.assertEqual(data, expected_bytes)
|
||||
|
||||
def testThreadsafeReadStatus(self)->None:
|
||||
yaml_dict = {
|
||||
"A": "a",
|
||||
"B": 1,
|
||||
"C": {
|
||||
"D": True,
|
||||
"E": [
|
||||
1, 2, 3
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml")
|
||||
threadsafe_write_status(yaml_dict, filepath)
|
||||
|
||||
read_dict = threadsafe_read_status(filepath)
|
||||
self.assertEqual(yaml_dict, read_dict)
|
||||
|
||||
with self.assertRaises(FileNotFoundError):
|
||||
threadsafe_read_status("doesNotExist")
|
||||
|
||||
filepath = os.path.join(TEST_MONITOR_BASE, "T.txt")
|
||||
with open(filepath, "w") as f:
|
||||
f.write("Data")
|
||||
|
||||
data = threadsafe_read_status(filepath)
|
||||
self.assertEqual(data, "Data")
|
||||
|
||||
def testThreadsafeUpdateStatus(self)->None:
|
||||
first_yaml_dict = {
|
||||
"A": "a",
|
||||
"B": 1,
|
||||
"C": {
|
||||
"D": True,
|
||||
"E": [
|
||||
1, 2, 3
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml")
|
||||
|
||||
self.assertFalse(os.path.exists(filepath))
|
||||
threadsafe_write_status(first_yaml_dict, filepath)
|
||||
self.assertTrue(os.path.exists(filepath))
|
||||
self.assertTrue(os.path.exists(f"{filepath}.lock"))
|
||||
|
||||
with open(filepath, 'r') as f:
|
||||
data = f.readlines()
|
||||
|
||||
expected_bytes = [
|
||||
'A: a\n',
|
||||
'B: 1\n',
|
||||
'C:\n',
|
||||
' D: true\n',
|
||||
' E:\n',
|
||||
' - 1\n',
|
||||
' - 2\n',
|
||||
' - 3\n'
|
||||
]
|
||||
|
||||
self.assertEqual(data, expected_bytes)
|
||||
|
||||
second_yaml_dict = {
|
||||
"B": 42,
|
||||
"C": {
|
||||
"E": [
|
||||
1, 2, 3, 4
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
filepath = os.path.join(TEST_MONITOR_BASE, "file.yaml")
|
||||
|
||||
threadsafe_update_status(second_yaml_dict, filepath)
|
||||
self.assertTrue(os.path.exists(filepath))
|
||||
self.assertTrue(os.path.exists(f"{filepath}.lock"))
|
||||
|
||||
with open(filepath, 'r') as f:
|
||||
data = f.readlines()
|
||||
|
||||
expected_bytes = [
|
||||
'A: a\n',
|
||||
'B: 42\n',
|
||||
'C:\n',
|
||||
' E:\n',
|
||||
' - 1\n',
|
||||
' - 2\n',
|
||||
' - 3\n',
|
||||
' - 4\n'
|
||||
]
|
||||
|
||||
self.assertEqual(data, expected_bytes)
|
||||
|
||||
class HashingTests(unittest.TestCase):
|
||||
def setUp(self)->None:
|
||||
@ -890,6 +1052,7 @@ class ProcessIoTests(unittest.TestCase):
|
||||
msg = readable.recv()
|
||||
self.assertEqual(msg, 1)
|
||||
|
||||
|
||||
class RequirementsTest(unittest.TestCase):
|
||||
def setUp(self)->None:
|
||||
super().setUp()
|
||||
|
Reference in New Issue
Block a user