692 lines
24 KiB
Python
692 lines
24 KiB
Python
|
|
import io
|
|
import os
|
|
import unittest
|
|
|
|
from time import sleep
|
|
|
|
from core.base_conductor import BaseConductor
|
|
from core.base_handler import BaseHandler
|
|
from core.base_monitor import BaseMonitor
|
|
from conductors import LocalPythonConductor
|
|
from core.correctness.vars import get_result_file, \
|
|
JOB_TYPE_PAPERMILL, JOB_ERROR, META_FILE, JOB_TYPE_PYTHON, JOB_CREATE_TIME
|
|
from core.runner import MeowRunner
|
|
from functionality.file_io import make_dir, read_file, read_notebook, read_yaml
|
|
from functionality.meow import create_parameter_sweep
|
|
from patterns.file_event_pattern import WatchdogMonitor, FileEventPattern
|
|
from recipes.jupyter_notebook_recipe import PapermillHandler, \
|
|
JupyterNotebookRecipe
|
|
from recipes.python_recipe import PythonHandler, PythonRecipe
|
|
from shared import setup, teardown, \
|
|
TEST_JOB_QUEUE, TEST_JOB_OUTPUT, TEST_MONITOR_BASE, \
|
|
APPENDING_NOTEBOOK, COMPLETE_PYTHON_SCRIPT, TEST_DIR
|
|
|
|
|
|
class MeowTests(unittest.TestCase):
|
|
def setUp(self)->None:
|
|
super().setUp()
|
|
setup()
|
|
|
|
def tearDown(self)->None:
|
|
super().tearDown()
|
|
teardown()
|
|
|
|
# Test MeowRunner creation
|
|
def testMeowRunnerSetup(self)->None:
|
|
monitor_one = WatchdogMonitor(TEST_MONITOR_BASE, {}, {})
|
|
monitor_two = WatchdogMonitor(TEST_MONITOR_BASE, {}, {})
|
|
monitors = [ monitor_one, monitor_two ]
|
|
|
|
handler_one = PapermillHandler()
|
|
handler_two = PapermillHandler()
|
|
handlers = [ handler_one, handler_two ]
|
|
|
|
conductor_one = LocalPythonConductor()
|
|
conductor_two = LocalPythonConductor()
|
|
conductors = [ conductor_one, conductor_two ]
|
|
|
|
runner = MeowRunner(monitor_one, handler_one, conductor_one)
|
|
|
|
self.assertIsInstance(runner.monitors, list)
|
|
for m in runner.monitors:
|
|
self.assertIsInstance(m, BaseMonitor)
|
|
self.assertEqual(len(runner.monitors), 1)
|
|
self.assertEqual(runner.monitors[0], monitor_one)
|
|
|
|
self.assertIsInstance(runner.from_monitors, list)
|
|
self.assertEqual(len(runner.from_monitors), 1)
|
|
runner.monitors[0].to_runner.send("monitor test message")
|
|
message = None
|
|
if runner.from_monitors[0].poll(3):
|
|
message = runner.from_monitors[0].recv()
|
|
self.assertIsNotNone(message)
|
|
self.assertEqual(message, "monitor test message")
|
|
|
|
self.assertIsInstance(runner.handlers, list)
|
|
for handler in runner.handlers:
|
|
self.assertIsInstance(handler, BaseHandler)
|
|
|
|
self.assertIsInstance(runner.from_handlers, list)
|
|
self.assertEqual(len(runner.from_handlers), 1)
|
|
runner.handlers[0].to_runner.send(
|
|
"handler test message")
|
|
message = None
|
|
if runner.from_handlers[0].poll(3):
|
|
message = runner.from_handlers[0].recv()
|
|
self.assertIsNotNone(message)
|
|
self.assertEqual(message, "handler test message")
|
|
|
|
self.assertIsInstance(runner.conductors, list)
|
|
for conductor in runner.conductors:
|
|
self.assertIsInstance(conductor, BaseConductor)
|
|
|
|
runner = MeowRunner(monitors, handlers, conductors)
|
|
|
|
self.assertIsInstance(runner.monitors, list)
|
|
for m in runner.monitors:
|
|
self.assertIsInstance(m, BaseMonitor)
|
|
self.assertEqual(len(runner.monitors), len(monitors))
|
|
self.assertIn(monitor_one, runner.monitors)
|
|
self.assertIn(monitor_two, runner.monitors)
|
|
|
|
self.assertIsInstance(runner.from_monitors, list)
|
|
self.assertEqual(len(runner.from_monitors), len(monitors))
|
|
for rm in runner.monitors:
|
|
rm.to_runner.send("monitor test message")
|
|
messages = [None] * len(monitors)
|
|
for i, rfm in enumerate(runner.from_monitors):
|
|
if rfm.poll(3):
|
|
messages[i] = rfm.recv()
|
|
for m in messages:
|
|
self.assertIsNotNone(m)
|
|
self.assertEqual(m, "monitor test message")
|
|
|
|
self.assertIsInstance(runner.handlers, list)
|
|
for handler in runner.handlers:
|
|
self.assertIsInstance(handler, BaseHandler)
|
|
|
|
self.assertIsInstance(runner.from_handlers, list)
|
|
self.assertEqual(len(runner.from_handlers), len(handlers))
|
|
for rh in runner.handlers:
|
|
rh.to_runner.send("handler test message")
|
|
message = None
|
|
if runner.from_handlers[0].poll(3):
|
|
message = runner.from_handlers[0].recv()
|
|
self.assertIsNotNone(message)
|
|
self.assertEqual(message, "handler test message")
|
|
|
|
self.assertIsInstance(runner.conductors, list)
|
|
for conductor in runner.conductors:
|
|
self.assertIsInstance(conductor, BaseConductor)
|
|
|
|
# Test meow runner directory overrides
|
|
def testMeowRunnerDirOverridesSetup(self)->None:
|
|
monitor_one = WatchdogMonitor(TEST_MONITOR_BASE, {}, {})
|
|
|
|
original_queue_dir = os.path.join(TEST_DIR, "original_queue")
|
|
original_output_dir = os.path.join(TEST_DIR, "original_output")
|
|
overridden_queue_dir = os.path.join(TEST_DIR, "overridden_queue")
|
|
overridden_output_dir = os.path.join(TEST_DIR, "overridden_output")
|
|
|
|
handler_one = PapermillHandler(job_queue_dir=original_queue_dir)
|
|
|
|
conductor_one = LocalPythonConductor(
|
|
job_queue_dir=original_queue_dir,
|
|
job_output_dir=original_output_dir
|
|
)
|
|
|
|
self.assertTrue(os.path.exists(original_queue_dir))
|
|
self.assertTrue(os.path.exists(original_output_dir))
|
|
self.assertFalse(os.path.exists(overridden_queue_dir))
|
|
self.assertFalse(os.path.exists(overridden_output_dir))
|
|
|
|
self.assertEqual(handler_one.job_queue_dir, original_queue_dir)
|
|
self.assertEqual(conductor_one.job_queue_dir, original_queue_dir)
|
|
self.assertEqual(conductor_one.job_output_dir, original_output_dir)
|
|
|
|
MeowRunner(
|
|
monitor_one,
|
|
handler_one,
|
|
conductor_one,
|
|
job_queue_dir=overridden_queue_dir,
|
|
job_output_dir=overridden_output_dir
|
|
)
|
|
|
|
self.assertTrue(os.path.exists(original_queue_dir))
|
|
self.assertTrue(os.path.exists(original_output_dir))
|
|
self.assertTrue(os.path.exists(overridden_queue_dir))
|
|
self.assertTrue(os.path.exists(overridden_output_dir))
|
|
|
|
self.assertEqual(handler_one.job_queue_dir, overridden_queue_dir)
|
|
self.assertEqual(conductor_one.job_queue_dir, overridden_queue_dir)
|
|
self.assertEqual(conductor_one.job_output_dir, overridden_output_dir)
|
|
|
|
# Test single meow papermill job execution
|
|
def testMeowRunnerPapermillExecution(self)->None:
|
|
pattern_one = FileEventPattern(
|
|
"pattern_one",
|
|
os.path.join("start", "A.txt"),
|
|
"recipe_one",
|
|
"infile",
|
|
parameters={
|
|
"extra":"A line from a test Pattern",
|
|
"outfile":os.path.join("{VGRID}", "output", "{FILENAME}")
|
|
})
|
|
recipe = JupyterNotebookRecipe(
|
|
"recipe_one", APPENDING_NOTEBOOK)
|
|
|
|
patterns = {
|
|
pattern_one.name: pattern_one,
|
|
}
|
|
recipes = {
|
|
recipe.name: recipe,
|
|
}
|
|
|
|
runner_debug_stream = io.StringIO("")
|
|
|
|
runner = MeowRunner(
|
|
WatchdogMonitor(
|
|
TEST_MONITOR_BASE,
|
|
patterns,
|
|
recipes,
|
|
settletime=1
|
|
),
|
|
PapermillHandler(),
|
|
LocalPythonConductor(),
|
|
job_queue_dir=TEST_JOB_QUEUE,
|
|
job_output_dir=TEST_JOB_OUTPUT,
|
|
print=runner_debug_stream,
|
|
logging=3
|
|
)
|
|
|
|
runner.start()
|
|
|
|
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
|
|
make_dir(start_dir)
|
|
self.assertTrue(start_dir)
|
|
with open(os.path.join(start_dir, "A.txt"), "w") as f:
|
|
f.write("Initial Data")
|
|
|
|
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
|
|
|
|
loops = 0
|
|
job_id = None
|
|
while loops < 5:
|
|
sleep(1)
|
|
runner_debug_stream.seek(0)
|
|
messages = runner_debug_stream.readlines()
|
|
|
|
for msg in messages:
|
|
self.assertNotIn("ERROR", msg)
|
|
|
|
if "INFO: Completed execution for job: '" in msg:
|
|
job_id = msg.replace(
|
|
"INFO: Completed execution for job: '", "")
|
|
job_id = job_id[:-2]
|
|
loops = 5
|
|
loops += 1
|
|
|
|
self.assertIsNotNone(job_id)
|
|
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1)
|
|
self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT))
|
|
|
|
runner.stop()
|
|
|
|
job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
|
|
self.assertEqual(len(os.listdir(job_dir)), 5)
|
|
|
|
result = read_notebook(
|
|
os.path.join(job_dir, get_result_file(JOB_TYPE_PAPERMILL)))
|
|
self.assertIsNotNone(result)
|
|
|
|
output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
|
|
self.assertTrue(os.path.exists(output_path))
|
|
|
|
with open(output_path, "r") as f:
|
|
data = f.read()
|
|
|
|
self.assertEqual(data, "Initial Data\nA line from a test Pattern")
|
|
|
|
# Test meow papermill job chaining within runner
|
|
def testMeowRunnerLinkedPapermillExecution(self)->None:
|
|
pattern_one = FileEventPattern(
|
|
"pattern_one",
|
|
os.path.join("start", "A.txt"),
|
|
"recipe_one",
|
|
"infile",
|
|
parameters={
|
|
"extra":"A line from Pattern 1",
|
|
"outfile":os.path.join("{VGRID}", "middle", "{FILENAME}")
|
|
})
|
|
pattern_two = FileEventPattern(
|
|
"pattern_two", os.path.join("middle", "A.txt"), "recipe_one", "infile",
|
|
parameters={
|
|
"extra":"A line from Pattern 2",
|
|
"outfile":os.path.join("{VGRID}", "output", "{FILENAME}")
|
|
})
|
|
recipe = JupyterNotebookRecipe(
|
|
"recipe_one", APPENDING_NOTEBOOK)
|
|
|
|
patterns = {
|
|
pattern_one.name: pattern_one,
|
|
pattern_two.name: pattern_two,
|
|
}
|
|
recipes = {
|
|
recipe.name: recipe,
|
|
}
|
|
|
|
runner_debug_stream = io.StringIO("")
|
|
|
|
runner = MeowRunner(
|
|
WatchdogMonitor(
|
|
TEST_MONITOR_BASE,
|
|
patterns,
|
|
recipes,
|
|
settletime=1
|
|
),
|
|
PapermillHandler(
|
|
job_queue_dir=TEST_JOB_QUEUE,
|
|
),
|
|
LocalPythonConductor(),
|
|
job_queue_dir=TEST_JOB_QUEUE,
|
|
job_output_dir=TEST_JOB_OUTPUT,
|
|
print=runner_debug_stream,
|
|
logging=3
|
|
)
|
|
|
|
runner.start()
|
|
|
|
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
|
|
make_dir(start_dir)
|
|
self.assertTrue(start_dir)
|
|
with open(os.path.join(start_dir, "A.txt"), "w") as f:
|
|
f.write("Initial Data")
|
|
|
|
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
|
|
|
|
loops = 0
|
|
job_ids = []
|
|
while len(job_ids) < 2 and loops < 5:
|
|
sleep(1)
|
|
runner_debug_stream.seek(0)
|
|
messages = runner_debug_stream.readlines()
|
|
|
|
for msg in messages:
|
|
self.assertNotIn("ERROR", msg)
|
|
|
|
if "INFO: Completed execution for job: '" in msg:
|
|
job_id = msg.replace(
|
|
"INFO: Completed execution for job: '", "")
|
|
job_id = job_id[:-2]
|
|
if job_id not in job_ids:
|
|
job_ids.append(job_id)
|
|
loops += 1
|
|
|
|
self.assertEqual(len(job_ids), 2)
|
|
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 2)
|
|
self.assertIn(job_ids[0], os.listdir(TEST_JOB_OUTPUT))
|
|
self.assertIn(job_ids[1], os.listdir(TEST_JOB_OUTPUT))
|
|
|
|
runner.stop()
|
|
|
|
mid_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
|
|
self.assertEqual(len(os.listdir(mid_job_dir)), 5)
|
|
|
|
result = read_notebook(
|
|
os.path.join(mid_job_dir, get_result_file(JOB_TYPE_PAPERMILL)))
|
|
self.assertIsNotNone(result)
|
|
|
|
mid_output_path = os.path.join(TEST_MONITOR_BASE, "middle", "A.txt")
|
|
self.assertTrue(os.path.exists(mid_output_path))
|
|
|
|
with open(mid_output_path, "r") as f:
|
|
data = f.read()
|
|
|
|
self.assertEqual(data, "Initial Data\nA line from Pattern 1")
|
|
|
|
final_job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
|
|
self.assertEqual(len(os.listdir(final_job_dir)), 5)
|
|
|
|
result = read_notebook(os.path.join(final_job_dir,
|
|
get_result_file(JOB_TYPE_PAPERMILL)))
|
|
self.assertIsNotNone(result)
|
|
|
|
final_output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
|
|
self.assertTrue(os.path.exists(final_output_path))
|
|
|
|
with open(final_output_path, "r") as f:
|
|
data = f.read()
|
|
|
|
self.assertEqual(data,
|
|
"Initial Data\nA line from Pattern 1\nA line from Pattern 2")
|
|
|
|
# Test single meow python job execution
|
|
def testMeowRunnerPythonExecution(self)->None:
|
|
pattern_one = FileEventPattern(
|
|
"pattern_one", os.path.join("start", "A.txt"), "recipe_one", "infile",
|
|
parameters={
|
|
"num":10000,
|
|
"outfile":os.path.join("{VGRID}", "output", "{FILENAME}")
|
|
})
|
|
recipe = PythonRecipe(
|
|
"recipe_one", COMPLETE_PYTHON_SCRIPT
|
|
)
|
|
|
|
patterns = {
|
|
pattern_one.name: pattern_one,
|
|
}
|
|
recipes = {
|
|
recipe.name: recipe,
|
|
}
|
|
|
|
runner_debug_stream = io.StringIO("")
|
|
|
|
runner = MeowRunner(
|
|
WatchdogMonitor(
|
|
TEST_MONITOR_BASE,
|
|
patterns,
|
|
recipes,
|
|
settletime=1
|
|
),
|
|
PythonHandler(
|
|
job_queue_dir=TEST_JOB_QUEUE
|
|
),
|
|
LocalPythonConductor(),
|
|
job_queue_dir=TEST_JOB_QUEUE,
|
|
job_output_dir=TEST_JOB_OUTPUT,
|
|
print=runner_debug_stream,
|
|
logging=3
|
|
)
|
|
|
|
runner.start()
|
|
|
|
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
|
|
make_dir(start_dir)
|
|
self.assertTrue(start_dir)
|
|
with open(os.path.join(start_dir, "A.txt"), "w") as f:
|
|
f.write("25000")
|
|
|
|
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
|
|
|
|
loops = 0
|
|
job_id = None
|
|
while loops < 5:
|
|
sleep(1)
|
|
runner_debug_stream.seek(0)
|
|
messages = runner_debug_stream.readlines()
|
|
|
|
for msg in messages:
|
|
self.assertNotIn("ERROR", msg)
|
|
|
|
if "INFO: Completed execution for job: '" in msg:
|
|
job_id = msg.replace(
|
|
"INFO: Completed execution for job: '", "")
|
|
job_id = job_id[:-2]
|
|
loops = 5
|
|
loops += 1
|
|
|
|
self.assertIsNotNone(job_id)
|
|
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 1)
|
|
self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT))
|
|
|
|
runner.stop()
|
|
|
|
job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
|
|
|
|
metafile = os.path.join(job_dir, META_FILE)
|
|
status = read_yaml(metafile)
|
|
|
|
self.assertNotIn(JOB_ERROR, status)
|
|
|
|
result_path = os.path.join(job_dir, get_result_file(JOB_TYPE_PYTHON))
|
|
self.assertTrue(os.path.exists(result_path))
|
|
result = read_file(os.path.join(result_path))
|
|
self.assertEqual(
|
|
result, "--STDOUT--\n12505000.0\ndone\n\n\n--STDERR--\n\n")
|
|
|
|
output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
|
|
self.assertTrue(os.path.exists(output_path))
|
|
output = read_file(os.path.join(output_path))
|
|
self.assertEqual(output, "12505000.0")
|
|
|
|
# Test meow python job chaining within runner
|
|
def testMeowRunnerLinkedPythonExecution(self)->None:
|
|
pattern_one = FileEventPattern(
|
|
"pattern_one",
|
|
os.path.join("start", "A.txt"),
|
|
"recipe_one",
|
|
"infile",
|
|
parameters={
|
|
"num":250,
|
|
"outfile":os.path.join("{VGRID}", "middle", "{FILENAME}")
|
|
})
|
|
pattern_two = FileEventPattern(
|
|
"pattern_two",
|
|
os.path.join("middle", "A.txt"),
|
|
"recipe_one",
|
|
"infile",
|
|
parameters={
|
|
"num":40,
|
|
"outfile":os.path.join("{VGRID}", "output", "{FILENAME}")
|
|
})
|
|
recipe = PythonRecipe(
|
|
"recipe_one", COMPLETE_PYTHON_SCRIPT
|
|
)
|
|
|
|
patterns = {
|
|
pattern_one.name: pattern_one,
|
|
pattern_two.name: pattern_two,
|
|
}
|
|
recipes = {
|
|
recipe.name: recipe,
|
|
}
|
|
|
|
runner_debug_stream = io.StringIO("")
|
|
|
|
runner = MeowRunner(
|
|
WatchdogMonitor(
|
|
TEST_MONITOR_BASE,
|
|
patterns,
|
|
recipes,
|
|
settletime=1
|
|
),
|
|
PythonHandler(
|
|
job_queue_dir=TEST_JOB_QUEUE
|
|
),
|
|
LocalPythonConductor(),
|
|
job_queue_dir=TEST_JOB_QUEUE,
|
|
job_output_dir=TEST_JOB_OUTPUT,
|
|
print=runner_debug_stream,
|
|
logging=3
|
|
)
|
|
|
|
runner.start()
|
|
|
|
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
|
|
make_dir(start_dir)
|
|
self.assertTrue(start_dir)
|
|
with open(os.path.join(start_dir, "A.txt"), "w") as f:
|
|
f.write("100")
|
|
|
|
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
|
|
|
|
loops = 0
|
|
job_ids = []
|
|
while len(job_ids) < 2 and loops < 5:
|
|
sleep(1)
|
|
runner_debug_stream.seek(0)
|
|
messages = runner_debug_stream.readlines()
|
|
|
|
for msg in messages:
|
|
self.assertNotIn("ERROR", msg)
|
|
|
|
if "INFO: Completed execution for job: '" in msg:
|
|
job_id = msg.replace(
|
|
"INFO: Completed execution for job: '", "")
|
|
job_id = job_id[:-2]
|
|
if job_id not in job_ids:
|
|
job_ids.append(job_id)
|
|
loops += 1
|
|
|
|
runner.stop()
|
|
|
|
self.assertEqual(len(job_ids), 2)
|
|
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 2)
|
|
self.assertIn(job_ids[0], os.listdir(TEST_JOB_OUTPUT))
|
|
self.assertIn(job_ids[1], os.listdir(TEST_JOB_OUTPUT))
|
|
|
|
meta0 = os.path.join(TEST_JOB_OUTPUT, job_ids[0], META_FILE)
|
|
status0 = read_yaml(meta0)
|
|
create0 = status0[JOB_CREATE_TIME]
|
|
meta1 = os.path.join(TEST_JOB_OUTPUT, job_ids[1], META_FILE)
|
|
status1 = read_yaml(meta1)
|
|
create1 = status1[JOB_CREATE_TIME]
|
|
if create0 < create1:
|
|
mid_job_id = job_ids[0]
|
|
final_job_id = job_ids[1]
|
|
else:
|
|
mid_job_id = job_ids[1]
|
|
final_job_id = job_ids[0]
|
|
|
|
mid_job_dir = os.path.join(TEST_JOB_OUTPUT, mid_job_id)
|
|
self.assertEqual(len(os.listdir(mid_job_dir)), 5)
|
|
|
|
mid_metafile = os.path.join(mid_job_dir, META_FILE)
|
|
mid_status = read_yaml(mid_metafile)
|
|
self.assertNotIn(JOB_ERROR, mid_status)
|
|
|
|
mid_result_path = os.path.join(
|
|
mid_job_dir, get_result_file(JOB_TYPE_PYTHON))
|
|
self.assertTrue(os.path.exists(mid_result_path))
|
|
mid_result = read_file(os.path.join(mid_result_path))
|
|
self.assertEqual(
|
|
mid_result, "--STDOUT--\n7806.25\ndone\n\n\n--STDERR--\n\n")
|
|
|
|
mid_output_path = os.path.join(TEST_MONITOR_BASE, "middle", "A.txt")
|
|
self.assertTrue(os.path.exists(mid_output_path))
|
|
mid_output = read_file(os.path.join(mid_output_path))
|
|
self.assertEqual(mid_output, "7806.25")
|
|
|
|
final_job_dir = os.path.join(TEST_JOB_OUTPUT, final_job_id)
|
|
self.assertEqual(len(os.listdir(final_job_dir)), 5)
|
|
|
|
final_metafile = os.path.join(final_job_dir, META_FILE)
|
|
final_status = read_yaml(final_metafile)
|
|
self.assertNotIn(JOB_ERROR, final_status)
|
|
|
|
final_result_path = os.path.join(final_job_dir, get_result_file(JOB_TYPE_PYTHON))
|
|
self.assertTrue(os.path.exists(final_result_path))
|
|
final_result = read_file(os.path.join(final_result_path))
|
|
self.assertEqual(
|
|
final_result, "--STDOUT--\n2146.5625\ndone\n\n\n--STDERR--\n\n")
|
|
|
|
final_output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
|
|
self.assertTrue(os.path.exists(final_output_path))
|
|
final_output = read_file(os.path.join(final_output_path))
|
|
self.assertEqual(final_output, "2146.5625")
|
|
|
|
# Test single meow python job execution
|
|
def testMeowRunnerSweptPythonExecution(self)->None:
|
|
pattern_one = FileEventPattern(
|
|
"pattern_one",
|
|
os.path.join("start", "A.txt"),
|
|
"recipe_one",
|
|
"infile",
|
|
sweep=create_parameter_sweep("num", 1000, 10000, 200),
|
|
parameters={
|
|
"outfile":os.path.join("{VGRID}", "output", "{FILENAME}")
|
|
})
|
|
recipe = PythonRecipe(
|
|
"recipe_one", COMPLETE_PYTHON_SCRIPT
|
|
)
|
|
|
|
patterns = {
|
|
pattern_one.name: pattern_one,
|
|
}
|
|
recipes = {
|
|
recipe.name: recipe,
|
|
}
|
|
|
|
runner_debug_stream = io.StringIO("")
|
|
|
|
runner = MeowRunner(
|
|
WatchdogMonitor(
|
|
TEST_MONITOR_BASE,
|
|
patterns,
|
|
recipes,
|
|
settletime=1
|
|
),
|
|
PythonHandler(
|
|
job_queue_dir=TEST_JOB_QUEUE
|
|
),
|
|
LocalPythonConductor(),
|
|
job_queue_dir=TEST_JOB_QUEUE,
|
|
job_output_dir=TEST_JOB_OUTPUT,
|
|
print=runner_debug_stream,
|
|
logging=3
|
|
)
|
|
|
|
runner.start()
|
|
|
|
start_dir = os.path.join(TEST_MONITOR_BASE, "start")
|
|
make_dir(start_dir)
|
|
self.assertTrue(start_dir)
|
|
with open(os.path.join(start_dir, "A.txt"), "w") as f:
|
|
f.write("25000")
|
|
|
|
self.assertTrue(os.path.exists(os.path.join(start_dir, "A.txt")))
|
|
|
|
loops = 0
|
|
job_ids = []
|
|
while loops < 5:
|
|
sleep(1)
|
|
runner_debug_stream.seek(0)
|
|
messages = runner_debug_stream.readlines()
|
|
|
|
for msg in messages:
|
|
self.assertNotIn("ERROR", msg)
|
|
|
|
if "INFO: Completed execution for job: '" in msg:
|
|
job_id = msg.replace(
|
|
"INFO: Completed execution for job: '", "")
|
|
job_id = job_id[:-2]
|
|
if job_id not in job_ids:
|
|
job_ids.append(job_id)
|
|
loops = 0
|
|
loops += 1
|
|
|
|
runner.stop()
|
|
|
|
self.assertIsNotNone(job_ids)
|
|
self.assertEqual(len(job_ids), 46)
|
|
self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), 46)
|
|
|
|
for job_id in job_ids:
|
|
self.assertIn(job_id, os.listdir(TEST_JOB_OUTPUT))
|
|
|
|
job_dir = os.path.join(TEST_JOB_OUTPUT, job_id)
|
|
|
|
metafile = os.path.join(job_dir, META_FILE)
|
|
status = read_yaml(metafile)
|
|
|
|
self.assertNotIn(JOB_ERROR, status)
|
|
|
|
result_path = os.path.join(job_dir, get_result_file(JOB_TYPE_PYTHON))
|
|
self.assertTrue(os.path.exists(result_path))
|
|
|
|
output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt")
|
|
self.assertTrue(os.path.exists(output_path))
|
|
|
|
# TODO adding tests with numpy or other external dependency
|
|
# TODO test getting job cannot handle
|
|
# TODO test getting event cannot handle
|
|
# TODO test with several matched monitors
|
|
# TODO test with several mismatched monitors
|
|
# TODO test with several matched handlers
|
|
# TODO test with several mismatched handlers
|
|
# TODO test with several matched conductors
|
|
# TODO test with several mismatched conductors
|
|
# TODO tests runner job queue dir
|
|
# TODO tests runner job output dir
|