added helper function to create parameter sweep, plus appropriate test
This commit is contained in:
@ -15,7 +15,8 @@ from core.correctness.validation import check_type, valid_dict, valid_list
|
||||
from core.correctness.vars import EVENT_PATH, EVENT_RULE, EVENT_TYPE, \
|
||||
EVENT_TYPE_WATCHDOG, JOB_CREATE_TIME, JOB_EVENT, JOB_ID, JOB_PATTERN, \
|
||||
JOB_RECIPE, JOB_REQUIREMENTS, JOB_RULE, JOB_STATUS, JOB_TYPE, \
|
||||
STATUS_QUEUED, WATCHDOG_BASE, WATCHDOG_HASH
|
||||
STATUS_QUEUED, WATCHDOG_BASE, WATCHDOG_HASH, SWEEP_JUMP, SWEEP_START, \
|
||||
SWEEP_STOP
|
||||
from functionality.naming import generate_job_id, generate_rule_id
|
||||
|
||||
# mig trigger keyword replacements
|
||||
@ -60,6 +61,42 @@ def replace_keywords(old_dict:Dict[str,str], job_id:str, src_path:str,
|
||||
|
||||
return new_dict
|
||||
|
||||
def create_parameter_sweep(variable_name:str, start:Union[int,float,complex],
|
||||
stop:Union[int,float,complex], jump:Union[int,float,complex]
|
||||
)->Dict[str,Dict[str,Union[int,float,complex]]]:
|
||||
check_type(variable_name, str, hint="create_parameter_sweep.variable_name")
|
||||
check_type(start, int, alt_types=[float, complex])
|
||||
check_type(stop, int, alt_types=[float, complex])
|
||||
check_type(jump, int, alt_types=[float, complex])
|
||||
|
||||
if jump == 0:
|
||||
raise ValueError(
|
||||
f"Cannot create sweep with a '{SWEEP_JUMP}' value of zero as this "
|
||||
"would be infinite in nature."
|
||||
)
|
||||
elif jump > 0:
|
||||
if not stop > start:
|
||||
raise ValueError(
|
||||
f"Cannot create sweep with a positive '{SWEEP_JUMP}' "
|
||||
"value where the end point is smaller than the start as this "
|
||||
"would be infinite in nature."
|
||||
)
|
||||
elif jump < 0:
|
||||
if not stop < start:
|
||||
raise ValueError(
|
||||
f"Cannot create sweep with a negative '{SWEEP_JUMP}' "
|
||||
"value where the end point is smaller than the start as this "
|
||||
"would be infinite in nature."
|
||||
)
|
||||
|
||||
return {
|
||||
variable_name: {
|
||||
SWEEP_START: start,
|
||||
SWEEP_STOP: stop,
|
||||
SWEEP_JUMP: jump
|
||||
}
|
||||
}
|
||||
|
||||
def create_event(event_type:str, path:str, rule:Any, extras:Dict[Any,Any]={}
|
||||
)->Dict[Any,Any]:
|
||||
"""Function to create a MEOW dictionary."""
|
||||
|
@ -232,7 +232,6 @@ def papermill_job_func(job_dir):
|
||||
# Create a parameterised version of the executable notebook
|
||||
try:
|
||||
base_notebook = read_notebook(base_file)
|
||||
# TODO read notebook from already written file rather than event
|
||||
job_notebook = parameterize_jupyter_notebook(
|
||||
base_notebook, yaml_dict
|
||||
)
|
||||
|
@ -13,7 +13,7 @@ from core.base_rule import BaseRule
|
||||
from core.correctness.vars import CHAR_LOWERCASE, CHAR_UPPERCASE, \
|
||||
SHA256, EVENT_TYPE, EVENT_PATH, EVENT_TYPE_WATCHDOG, \
|
||||
WATCHDOG_BASE, WATCHDOG_HASH, EVENT_RULE, JOB_PARAMETERS, JOB_HASH, \
|
||||
PYTHON_FUNC, JOB_ID, JOB_EVENT, \
|
||||
PYTHON_FUNC, JOB_ID, JOB_EVENT, SWEEP_JUMP, SWEEP_START, SWEEP_STOP, \
|
||||
JOB_TYPE, JOB_PATTERN, JOB_RECIPE, JOB_RULE, JOB_STATUS, JOB_CREATE_TIME, \
|
||||
JOB_REQUIREMENTS, STATUS_QUEUED, JOB_TYPE_PAPERMILL
|
||||
from functionality.debug import setup_debugging
|
||||
@ -23,6 +23,7 @@ from functionality.file_io import lines_to_string, make_dir, read_file, \
|
||||
from functionality.hashing import get_file_hash
|
||||
from functionality.meow import create_event, create_job, create_rule, \
|
||||
create_rules, create_watchdog_event, replace_keywords, \
|
||||
create_parameter_sweep, \
|
||||
KEYWORD_BASE, KEYWORD_DIR, KEYWORD_EXTENSION, KEYWORD_FILENAME, \
|
||||
KEYWORD_JOB, KEYWORD_PATH, KEYWORD_PREFIX, KEYWORD_REL_DIR, \
|
||||
KEYWORD_REL_PATH
|
||||
@ -623,6 +624,33 @@ class MeowTests(unittest.TestCase):
|
||||
with self.assertRaises(KeyError):
|
||||
create_rules({}, recipes)
|
||||
|
||||
# Test create parameter sweep function
|
||||
def testCreateParameterSweep(self)->None:
|
||||
create_parameter_sweep("name", 0, 10, 2)
|
||||
create_parameter_sweep("name", 10, 0, -2)
|
||||
create_parameter_sweep("name", 0.0, 10.0, 1.3)
|
||||
create_parameter_sweep("name", 10.0, 0.0, -1.3)
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
create_parameter_sweep(0, 0, 10, 2)
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
create_parameter_sweep("name", "0", 10, 2)
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
create_parameter_sweep("name", 0, "10", 2)
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
create_parameter_sweep("name", 0, 10, "2")
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
create_parameter_sweep("name", 0, 10, 0)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
create_parameter_sweep("name", 0, 10, -1)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
create_parameter_sweep("name", 10, 0, 1)
|
||||
|
||||
|
||||
class NamingTests(unittest.TestCase):
|
||||
|
@ -585,6 +585,101 @@ class MeowTests(unittest.TestCase):
|
||||
final_output = read_file(os.path.join(final_output_path))
|
||||
self.assertEqual(final_output, "2146.5625")
|
||||
|
||||
# Test single meow python job execution
|
||||
def testMeowRunnerSweptPythonExecution(self)->None:
|
||||
pattern_one = FileEventPattern(
|
||||
"pattern_one",
|
||||
os.path.join("start", "A.txt"),
|
||||
"recipe_one",
|
||||
"infile",
|
||||
parameters={
|
||||
"num":10000,
|
||||
"outfile":os.path.join("{VGRID}", "output", "{FILENAME}")
|
||||
})
|
||||
recipe = PythonRecipe(
|
||||
"recipe_one", COMPLETE_PYTHON_SCRIPT
|
||||
)
|
||||
|
||||
patterns = {
|
||||
pattern_one.name: pattern_one,
|
||||
}
|
||||
recipes = {
|
||||
recipe.name: recipe,
|
||||
}
|
||||
|
||||
runner_debug_stream = io.StringIO("")
|
||||
|
||||
runner = MeowRunner(
|
||||
WatchdogMonitor(
|
||||
TEST_MONITOR_BASE,
|
||||
patterns,
|
||||
recipes,
|
||||
settletime=1
|
||||
),
|
||||
PythonHandler(
|
||||
job_queue_dir=TEST_JOB_QUEUE
|
||||
),
|
||||
LocalPythonConductor(),
|
||||
job_queue_dir=TEST_JOB_QUEUE,
|
||||
job_output_dir=TEST_JOB_OUTPUT,
|
||||
print=runner_debug_stream,
|
||||
logging=3
|
||||
)
|
||||
|
||||
runner.start()
|
||||
|
||||
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
|
||||
make_dir(start_dir)
|
||||
self.assertTrue(start_dir)
|
||||
with open(os.path.join(start_dir, "A.txt"), "w") as f:
|
||||
f.write("25000")
|
||||
|
||||
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
|
||||
|
||||
loops = 0
|
||||
job_id = None
|
||||
while loops < 15:
|
||||
sleep(1)
|
||||
runner_debug_stream.seek(0)
|
||||
messages = runner_debug_stream.readlines()
|
||||
|
||||
for msg in messages:
|
||||
self.assertNotIn("ERROR", msg)
|
||||
|
||||
if "INFO: Completed execution for job: '" in msg:
|
||||
job_id = msg.replace(
|
||||
"INFO: Completed execution for job: '", "")
|
||||
job_id = job_id[:-2]
|
||||
loops = 15
|
||||
loops += 1
|
||||
|
||||
self.assertIsNotNone(job_id)
|
||||
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1)
|
||||
self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT))
|
||||
|
||||
runner.stop()
|
||||
|
||||
job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
|
||||
|
||||
metafile = os.path.join(job_dir, META_FILE)
|
||||
status = read_yaml(metafile)
|
||||
|
||||
self.assertNotIn(JOB_ERROR, status)
|
||||
|
||||
result_path = os.path.join(job_dir, get_result_file(JOB_TYPE_PYTHON))
|
||||
self.assertTrue(os.path.exists(result_path))
|
||||
result = read_file(os.path.join(result_path))
|
||||
self.assertEqual(
|
||||
result, "--STDOUT--\n12505000.0\ndone\n\n\n--STDERR--\n\n")
|
||||
|
||||
output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
|
||||
self.assertTrue(os.path.exists(output_path))
|
||||
output = read_file(os.path.join(output_path))
|
||||
self.assertEqual(output, "12505000.0")
|
||||
|
||||
|
||||
|
||||
|
||||
# TODO sweep execution test
|
||||
# TODO adding tests with numpy or other external dependency
|
||||
# TODO test getting job cannot handle
|
||||
|
Reference in New Issue
Block a user