diff --git a/tests/test_functionality.py b/tests/test_functionality.py index d61b64b..9de0d66 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -32,7 +32,7 @@ from recipes import JupyterNotebookRecipe from shared import setup, teardown, TEST_MONITOR_BASE, COMPLETE_NOTEBOOK, \ APPENDING_NOTEBOOK, COMPLETE_PYTHON_SCRIPT -class CorrectnessTests(unittest.TestCase): +class DebugTests(unittest.TestCase): def setUp(self)->None: super().setUp() setup() @@ -41,385 +41,15 @@ class CorrectnessTests(unittest.TestCase): super().tearDown() teardown() - # Test that generate_id creates unique ids - def testGenerateIDWorking(self)->None: - id = generate_id() - self.assertEqual(len(id), 16) - for i in range(len(id)): - self.assertIn(id[i], CHAR_UPPERCASE+CHAR_LOWERCASE) - # In extrememly rare cases this may fail due to randomness in algorithm - new_id = generate_id(existing_ids=[id]) - self.assertNotEqual(id, new_id) +class FileIoTests(unittest.TestCase): + def setUp(self)->None: + super().setUp() + setup() - another_id = generate_id(length=32) - self.assertEqual(len(another_id), 32) - - again_id = generate_id(charset="a") - for i in range(len(again_id)): - self.assertIn(again_id[i], "a") - - with self.assertRaises(ValueError): - generate_id(length=2, charset="a", existing_ids=["aa"]) - - prefix_id = generate_id(length=4, prefix="Test") - self.assertEqual(prefix_id, "Test") - - prefix_id = generate_id(prefix="Test") - self.assertEqual(len(prefix_id), 16) - self.assertTrue(prefix_id.startswith("Test")) - - # Test that wait can wait on multiple pipes - def testWaitPipes(self)->None: - pipe_one_reader, pipe_one_writer = Pipe() - pipe_two_reader, pipe_two_writer = Pipe() - - inputs = [ - pipe_one_reader, pipe_two_reader - ] - - pipe_one_writer.send(1) - readables = wait(inputs) - - self.assertIn(pipe_one_reader, readables) - self.assertEqual(len(readables), 1) - msg = readables[0].recv() - self.assertEqual(msg, 1) - - pipe_one_writer.send(1) - pipe_two_writer.send(2) - readables = wait(inputs) - - self.assertIn(pipe_one_reader, readables) - self.assertIn(pipe_two_reader, readables) - self.assertEqual(len(readables), 2) - for readable in readables: - if readable == pipe_one_reader: - msg = readable.recv() - self.assertEqual(msg, 1) - elif readable == pipe_two_reader: - msg = readable.recv() - self.assertEqual(msg, 2) - - # Test that wait can wait on multiple queues - def testWaitQueues(self)->None: - queue_one = Queue() - queue_two = Queue() - - inputs = [ - queue_one, queue_two - ] - - queue_one.put(1) - readables = wait(inputs) - - self.assertIn(queue_one, readables) - self.assertEqual(len(readables), 1) - msg = readables[0].get() - self.assertEqual(msg, 1) - - queue_one.put(1) - queue_two.put(2) - sleep(0.1) - readables = wait(inputs) - - self.assertIn(queue_one, readables) - self.assertIn(queue_two, readables) - self.assertEqual(len(readables), 2) - for readable in readables: - if readable == queue_one: - msg = readable.get() - self.assertEqual(msg, 1) - elif readable == queue_two: - msg = readable.get() - self.assertEqual(msg, 2) - - # Test that wait can wait on multiple pipes and queues - def testWaitPipesAndQueues(self)->None: - pipe_one_reader, pipe_one_writer = Pipe() - pipe_two_reader, pipe_two_writer = Pipe() - queue_one = Queue() - queue_two = Queue() - - inputs = [ - pipe_one_reader, pipe_two_reader, queue_one, queue_two - ] - - pipe_one_writer.send(1) - readables = wait(inputs) - - self.assertIn(pipe_one_reader, readables) - self.assertEqual(len(readables), 1) - msg = readables[0].recv() - self.assertEqual(msg, 1) - - pipe_one_writer.send(1) - pipe_two_writer.send(2) - readables = wait(inputs) - - self.assertIn(pipe_one_reader, readables) - self.assertIn(pipe_two_reader, readables) - self.assertEqual(len(readables), 2) - for readable in readables: - if readable == pipe_one_reader: - msg = readable.recv() - self.assertEqual(msg, 1) - if readable == pipe_two_reader: - msg = readable.recv() - self.assertEqual(msg, 2) - - queue_one.put(1) - readables = wait(inputs) - - self.assertIn(queue_one, readables) - self.assertEqual(len(readables), 1) - msg = readables[0].get() - self.assertEqual(msg, 1) - - queue_one.put(1) - queue_two.put(2) - sleep(0.1) - readables = wait(inputs) - - self.assertIn(queue_one, readables) - self.assertIn(queue_two, readables) - self.assertEqual(len(readables), 2) - for readable in readables: - if readable == queue_one: - msg = readable.get() - self.assertEqual(msg, 1) - elif readable == queue_two: - msg = readable.get() - self.assertEqual(msg, 2) - - queue_one.put(1) - pipe_one_writer.send(1) - sleep(0.1) - readables = wait(inputs) - - self.assertIn(queue_one, readables) - self.assertIn(pipe_one_reader, readables) - self.assertEqual(len(readables), 2) - for readable in readables: - if readable == queue_one: - msg = readable.get() - self.assertEqual(msg, 1) - elif readable == pipe_one_reader: - msg = readable.recv() - self.assertEqual(msg, 1) - - # Test that get_file_hash produces the expected hash - def testGetFileHashSha256(self)->None: - file_path = os.path.join(TEST_MONITOR_BASE, "hased_file.txt") - with open(file_path, 'w') as hashed_file: - hashed_file.write("Some data\n") - expected_hash = \ - "8557122088c994ba8aa5540ccbb9a3d2d8ae2887046c2db23d65f40ae63abade" - - hash = get_file_hash(file_path, SHA256) - self.assertEqual(hash, expected_hash) - - # Test that get_file_hash raises on a missing file - def testGetFileHashSha256NoFile(self)->None: - file_path = os.path.join(TEST_MONITOR_BASE, "file.txt") - - with self.assertRaises(FileNotFoundError): - get_file_hash(file_path, SHA256) - - # Test that parameterize_jupyter_notebook parameterises given notebook - def testParameteriseNotebook(self)->None: - pn = parameterize_jupyter_notebook( - COMPLETE_NOTEBOOK, {}) - - self.assertEqual(pn, COMPLETE_NOTEBOOK) - - pn = parameterize_jupyter_notebook( - COMPLETE_NOTEBOOK, {"a": 4}) - - self.assertEqual(pn, COMPLETE_NOTEBOOK) - - pn = parameterize_jupyter_notebook( - COMPLETE_NOTEBOOK, {"s": 4}) - - self.assertNotEqual(pn, COMPLETE_NOTEBOOK) - self.assertEqual( - pn["cells"][0]["source"], - "# The first cell\n\ns = 4\nnum = 1000") - - # Test that parameterize_python_script parameterises given script - def testParameteriseScript(self)->None: - ps = parameterize_python_script( - COMPLETE_PYTHON_SCRIPT, {}) - - self.assertEqual(ps, COMPLETE_PYTHON_SCRIPT) - - ps = parameterize_python_script( - COMPLETE_PYTHON_SCRIPT, {"a": 50}) - - self.assertEqual(ps, COMPLETE_PYTHON_SCRIPT) - - ps = parameterize_python_script( - COMPLETE_PYTHON_SCRIPT, {"num": 50}) - - self.assertNotEqual(ps, COMPLETE_PYTHON_SCRIPT) - self.assertEqual(ps[2], "num = 50") - - # Test that create_event produces valid event dictionary - def testCreateEvent(self)->None: - pattern = FileEventPattern( - "pattern", - "file_path", - "recipe_one", - "infile", - parameters={ - "extra":"A line from a test Pattern", - "outfile":"result_path" - }) - recipe = JupyterNotebookRecipe( - "recipe_one", APPENDING_NOTEBOOK) - - rule = create_rule(pattern, recipe) - - event = create_event("test", "path", rule) - - self.assertEqual(type(event), dict) - self.assertEqual(len(event.keys()), 3) - self.assertTrue(EVENT_TYPE in event.keys()) - self.assertTrue(EVENT_PATH in event.keys()) - self.assertTrue(EVENT_RULE in event.keys()) - self.assertEqual(event[EVENT_TYPE], "test") - self.assertEqual(event[EVENT_PATH], "path") - self.assertEqual(event[EVENT_RULE], rule) - - event2 = create_event("test2", "path2", rule, extras={"a":1}) - - self.assertEqual(type(event2), dict) - self.assertTrue(EVENT_TYPE in event2.keys()) - self.assertTrue(EVENT_PATH in event.keys()) - self.assertTrue(EVENT_RULE in event.keys()) - self.assertEqual(len(event2.keys()), 4) - self.assertEqual(event2[EVENT_TYPE], "test2") - self.assertEqual(event2[EVENT_PATH], "path2") - self.assertEqual(event2[EVENT_RULE], rule) - self.assertEqual(event2["a"], 1) - - # Test that create_job produces valid job dictionary - def testCreateJob(self)->None: - pattern = FileEventPattern( - "pattern", - "file_path", - "recipe_one", - "infile", - parameters={ - "extra":"A line from a test Pattern", - "outfile":"result_path" - }) - recipe = JupyterNotebookRecipe( - "recipe_one", APPENDING_NOTEBOOK) - - rule = create_rule(pattern, recipe) - - event = create_event( - EVENT_TYPE_WATCHDOG, - "file_path", - rule, - extras={ - WATCHDOG_BASE: TEST_MONITOR_BASE, - EVENT_RULE: rule, - WATCHDOG_HASH: "file_hash" - } - ) - - job_dict = create_job( - JOB_TYPE_PAPERMILL, - event, - extras={ - JOB_PARAMETERS:{ - "extra":"extra", - "infile":"file_path", - "outfile":"result_path" - }, - JOB_HASH: "file_hash", - PYTHON_FUNC:max - } - ) - - self.assertIsInstance(job_dict, dict) - self.assertIn(JOB_ID, job_dict) - self.assertIsInstance(job_dict[JOB_ID], str) - self.assertIn(JOB_EVENT, job_dict) - self.assertEqual(job_dict[JOB_EVENT], event) - self.assertIn(JOB_TYPE, job_dict) - self.assertEqual(job_dict[JOB_TYPE], JOB_TYPE_PAPERMILL) - self.assertIn(JOB_PATTERN, job_dict) - self.assertEqual(job_dict[JOB_PATTERN], pattern.name) - self.assertIn(JOB_RECIPE, job_dict) - self.assertEqual(job_dict[JOB_RECIPE], recipe.name) - self.assertIn(JOB_RULE, job_dict) - self.assertEqual(job_dict[JOB_RULE], rule.name) - self.assertIn(JOB_STATUS, job_dict) - self.assertEqual(job_dict[JOB_STATUS], STATUS_QUEUED) - self.assertIn(JOB_CREATE_TIME, job_dict) - self.assertIsInstance(job_dict[JOB_CREATE_TIME], datetime) - self.assertIn(JOB_REQUIREMENTS, job_dict) - self.assertEqual(job_dict[JOB_REQUIREMENTS], {}) - - # Test that replace_keywords replaces MEOW keywords in a given dictionary - def testReplaceKeywords(self)->None: - test_dict = { - "A": f"--{KEYWORD_PATH}--", - "B": f"--{KEYWORD_REL_PATH}--", - "C": f"--{KEYWORD_DIR}--", - "D": f"--{KEYWORD_REL_DIR}--", - "E": f"--{KEYWORD_FILENAME}--", - "F": f"--{KEYWORD_PREFIX}--", - "G": f"--{KEYWORD_BASE}--", - "H": f"--{KEYWORD_EXTENSION}--", - "I": f"--{KEYWORD_JOB}--", - "J": f"--{KEYWORD_PATH}-{KEYWORD_PATH}--", - "K": f"{KEYWORD_PATH}", - "L": f"--{KEYWORD_PATH}-{KEYWORD_REL_PATH}-{KEYWORD_DIR}-" - f"{KEYWORD_REL_DIR}-{KEYWORD_FILENAME}-{KEYWORD_PREFIX}-" - f"{KEYWORD_BASE}-{KEYWORD_EXTENSION}-{KEYWORD_JOB}--", - "M": "A", - "N": 1 - } - - replaced = replace_keywords( - test_dict, - "job_id", - os.path.join("base", "src", "dir", "file.ext"), - os.path.join("base", "monitor", "dir") - ) - - self.assertIsInstance(replaced, dict) - self.assertEqual(len(test_dict.keys()), len(replaced.keys())) - for k in test_dict.keys(): - self.assertIn(k, replaced) - - self.assertEqual(replaced["A"], - os.path.join("--base", "src", "dir", "file.ext--")) - self.assertEqual(replaced["B"], - os.path.join("--..", "..", "src", "dir", "file.ext--")) - self.assertEqual(replaced["C"], - os.path.join("--base", "src", "dir--")) - self.assertEqual(replaced["D"], - os.path.join("--..", "..", "src", "dir--")) - self.assertEqual(replaced["E"], "--file.ext--") - self.assertEqual(replaced["F"], "--file--") - self.assertEqual(replaced["G"], - os.path.join("--base", "monitor", "dir--")) - self.assertEqual(replaced["H"], "--.ext--") - self.assertEqual(replaced["I"], "--job_id--") - self.assertEqual(replaced["J"], - os.path.join("--base", "src", "dir", "file.ext-base", "src", "dir", "file.ext--")) - self.assertEqual(replaced["K"], - os.path.join("base", "src", "dir", "file.ext")) - self.assertEqual(replaced["L"], - os.path.join("--base", "src", "dir", "file.ext-..", "..", "src", "dir", "file.ext-base", "src", "dir-" - "..", "..", "src", "dir-file.ext-file-base", "monitor", "dir-.ext-job_id--")) - self.assertEqual(replaced["M"], "A") - self.assertEqual(replaced["N"], 1) + def tearDown(self)->None: + super().tearDown() + teardown() # Test that write_file can write files def testWriteFile(self)->None: @@ -670,6 +300,150 @@ data""" self.assertFalse(os.path.exists( os.path.join(TEST_MONITOR_BASE, "A", "B"))) + # Test lines to str + def testLinesToStr(self)->None: + l = ["a", "b", "c"] + + self.assertEqual(lines_to_string(l), "a\nb\nc") + + +class HashingTests(unittest.TestCase): + def setUp(self)->None: + super().setUp() + setup() + + def tearDown(self)->None: + super().tearDown() + teardown() + + # Test that get_file_hash produces the expected hash + def testGetFileHashSha256(self)->None: + file_path = os.path.join(TEST_MONITOR_BASE, "hased_file.txt") + with open(file_path, 'w') as hashed_file: + hashed_file.write("Some data\n") + expected_hash = \ + "8557122088c994ba8aa5540ccbb9a3d2d8ae2887046c2db23d65f40ae63abade" + + hash = get_file_hash(file_path, SHA256) + self.assertEqual(hash, expected_hash) + + # Test that get_file_hash raises on a missing file + def testGetFileHashSha256NoFile(self)->None: + file_path = os.path.join(TEST_MONITOR_BASE, "file.txt") + + with self.assertRaises(FileNotFoundError): + get_file_hash(file_path, SHA256) + + +class MeowTests(unittest.TestCase): + def setUp(self)->None: + super().setUp() + setup() + + def tearDown(self)->None: + super().tearDown() + teardown() + + # Test that create_event produces valid event dictionary + def testCreateEvent(self)->None: + pattern = FileEventPattern( + "pattern", + "file_path", + "recipe_one", + "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":"result_path" + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + event = create_event("test", "path", rule) + + self.assertEqual(type(event), dict) + self.assertEqual(len(event.keys()), 3) + self.assertTrue(EVENT_TYPE in event.keys()) + self.assertTrue(EVENT_PATH in event.keys()) + self.assertTrue(EVENT_RULE in event.keys()) + self.assertEqual(event[EVENT_TYPE], "test") + self.assertEqual(event[EVENT_PATH], "path") + self.assertEqual(event[EVENT_RULE], rule) + + event2 = create_event("test2", "path2", rule, extras={"a":1}) + + self.assertEqual(type(event2), dict) + self.assertTrue(EVENT_TYPE in event2.keys()) + self.assertTrue(EVENT_PATH in event.keys()) + self.assertTrue(EVENT_RULE in event.keys()) + self.assertEqual(len(event2.keys()), 4) + self.assertEqual(event2[EVENT_TYPE], "test2") + self.assertEqual(event2[EVENT_PATH], "path2") + self.assertEqual(event2[EVENT_RULE], rule) + self.assertEqual(event2["a"], 1) + + # Test that create_job produces valid job dictionary + def testCreateJob(self)->None: + pattern = FileEventPattern( + "pattern", + "file_path", + "recipe_one", + "infile", + parameters={ + "extra":"A line from a test Pattern", + "outfile":"result_path" + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + event = create_event( + EVENT_TYPE_WATCHDOG, + "file_path", + rule, + extras={ + WATCHDOG_BASE: TEST_MONITOR_BASE, + EVENT_RULE: rule, + WATCHDOG_HASH: "file_hash" + } + ) + + job_dict = create_job( + JOB_TYPE_PAPERMILL, + event, + extras={ + JOB_PARAMETERS:{ + "extra":"extra", + "infile":"file_path", + "outfile":"result_path" + }, + JOB_HASH: "file_hash", + PYTHON_FUNC:max + } + ) + + self.assertIsInstance(job_dict, dict) + self.assertIn(JOB_ID, job_dict) + self.assertIsInstance(job_dict[JOB_ID], str) + self.assertIn(JOB_EVENT, job_dict) + self.assertEqual(job_dict[JOB_EVENT], event) + self.assertIn(JOB_TYPE, job_dict) + self.assertEqual(job_dict[JOB_TYPE], JOB_TYPE_PAPERMILL) + self.assertIn(JOB_PATTERN, job_dict) + self.assertEqual(job_dict[JOB_PATTERN], pattern.name) + self.assertIn(JOB_RECIPE, job_dict) + self.assertEqual(job_dict[JOB_RECIPE], recipe.name) + self.assertIn(JOB_RULE, job_dict) + self.assertEqual(job_dict[JOB_RULE], rule.name) + self.assertIn(JOB_STATUS, job_dict) + self.assertEqual(job_dict[JOB_STATUS], STATUS_QUEUED) + self.assertIn(JOB_CREATE_TIME, job_dict) + self.assertIsInstance(job_dict[JOB_CREATE_TIME], datetime) + self.assertIn(JOB_REQUIREMENTS, job_dict) + self.assertEqual(job_dict[JOB_REQUIREMENTS], {}) + # Test creation of watchdog event dict def testCreateWatchdogEvent(self)->None: pattern = FileEventPattern( @@ -722,8 +496,295 @@ data""" self.assertEqual(event[WATCHDOG_BASE], "base") self.assertEqual(event[WATCHDOG_HASH], "hash") - # Test lines to str - def testLinesToStr(self)->None: - l = ["a", "b", "c"] + # Test that replace_keywords replaces MEOW keywords in a given dictionary + def testReplaceKeywords(self)->None: + test_dict = { + "A": f"--{KEYWORD_PATH}--", + "B": f"--{KEYWORD_REL_PATH}--", + "C": f"--{KEYWORD_DIR}--", + "D": f"--{KEYWORD_REL_DIR}--", + "E": f"--{KEYWORD_FILENAME}--", + "F": f"--{KEYWORD_PREFIX}--", + "G": f"--{KEYWORD_BASE}--", + "H": f"--{KEYWORD_EXTENSION}--", + "I": f"--{KEYWORD_JOB}--", + "J": f"--{KEYWORD_PATH}-{KEYWORD_PATH}--", + "K": f"{KEYWORD_PATH}", + "L": f"--{KEYWORD_PATH}-{KEYWORD_REL_PATH}-{KEYWORD_DIR}-" + f"{KEYWORD_REL_DIR}-{KEYWORD_FILENAME}-{KEYWORD_PREFIX}-" + f"{KEYWORD_BASE}-{KEYWORD_EXTENSION}-{KEYWORD_JOB}--", + "M": "A", + "N": 1 + } - self.assertEqual(lines_to_string(l), "a\nb\nc") \ No newline at end of file + replaced = replace_keywords( + test_dict, + "job_id", + os.path.join("base", "src", "dir", "file.ext"), + os.path.join("base", "monitor", "dir") + ) + + self.assertIsInstance(replaced, dict) + self.assertEqual(len(test_dict.keys()), len(replaced.keys())) + for k in test_dict.keys(): + self.assertIn(k, replaced) + + self.assertEqual(replaced["A"], + os.path.join("--base", "src", "dir", "file.ext--")) + self.assertEqual(replaced["B"], + os.path.join("--..", "..", "src", "dir", "file.ext--")) + self.assertEqual(replaced["C"], + os.path.join("--base", "src", "dir--")) + self.assertEqual(replaced["D"], + os.path.join("--..", "..", "src", "dir--")) + self.assertEqual(replaced["E"], "--file.ext--") + self.assertEqual(replaced["F"], "--file--") + self.assertEqual(replaced["G"], + os.path.join("--base", "monitor", "dir--")) + self.assertEqual(replaced["H"], "--.ext--") + self.assertEqual(replaced["I"], "--job_id--") + self.assertEqual(replaced["J"], + os.path.join("--base", "src", "dir", "file.ext-base", "src", "dir", "file.ext--")) + self.assertEqual(replaced["K"], + os.path.join("base", "src", "dir", "file.ext")) + self.assertEqual(replaced["L"], + os.path.join("--base", "src", "dir", "file.ext-..", "..", "src", "dir", "file.ext-base", "src", "dir-" + "..", "..", "src", "dir-file.ext-file-base", "monitor", "dir-.ext-job_id--")) + self.assertEqual(replaced["M"], "A") + self.assertEqual(replaced["N"], 1) + + +class NamingTests(unittest.TestCase): + def setUp(self)->None: + super().setUp() + setup() + + def tearDown(self)->None: + super().tearDown() + teardown() + + # Test that generate_id creates unique ids + def testGenerateIDWorking(self)->None: + id = generate_id() + self.assertEqual(len(id), 16) + for i in range(len(id)): + self.assertIn(id[i], CHAR_UPPERCASE+CHAR_LOWERCASE) + + # In extrememly rare cases this may fail due to randomness in algorithm + new_id = generate_id(existing_ids=[id]) + self.assertNotEqual(id, new_id) + + another_id = generate_id(length=32) + self.assertEqual(len(another_id), 32) + + again_id = generate_id(charset="a") + for i in range(len(again_id)): + self.assertIn(again_id[i], "a") + + with self.assertRaises(ValueError): + generate_id(length=2, charset="a", existing_ids=["aa"]) + + prefix_id = generate_id(length=4, prefix="Test") + self.assertEqual(prefix_id, "Test") + + prefix_id = generate_id(prefix="Test") + self.assertEqual(len(prefix_id), 16) + self.assertTrue(prefix_id.startswith("Test")) + + +class ParameterisationTests(unittest.TestCase): + def setUp(self)->None: + super().setUp() + setup() + + def tearDown(self)->None: + super().tearDown() + teardown() + + # Test that parameterize_jupyter_notebook parameterises given notebook + def testParameteriseNotebook(self)->None: + pn = parameterize_jupyter_notebook( + COMPLETE_NOTEBOOK, {}) + + self.assertEqual(pn, COMPLETE_NOTEBOOK) + + pn = parameterize_jupyter_notebook( + COMPLETE_NOTEBOOK, {"a": 4}) + + self.assertEqual(pn, COMPLETE_NOTEBOOK) + + pn = parameterize_jupyter_notebook( + COMPLETE_NOTEBOOK, {"s": 4}) + + self.assertNotEqual(pn, COMPLETE_NOTEBOOK) + self.assertEqual( + pn["cells"][0]["source"], + "# The first cell\n\ns = 4\nnum = 1000") + + # Test that parameterize_python_script parameterises given script + def testParameteriseScript(self)->None: + ps = parameterize_python_script( + COMPLETE_PYTHON_SCRIPT, {}) + + self.assertEqual(ps, COMPLETE_PYTHON_SCRIPT) + + ps = parameterize_python_script( + COMPLETE_PYTHON_SCRIPT, {"a": 50}) + + self.assertEqual(ps, COMPLETE_PYTHON_SCRIPT) + + ps = parameterize_python_script( + COMPLETE_PYTHON_SCRIPT, {"num": 50}) + + self.assertNotEqual(ps, COMPLETE_PYTHON_SCRIPT) + self.assertEqual(ps[2], "num = 50") + + + +class ProcessIoTests(unittest.TestCase): + def setUp(self)->None: + super().setUp() + setup() + + def tearDown(self)->None: + super().tearDown() + teardown() + + # Test that wait can wait on multiple pipes + def testWaitPipes(self)->None: + pipe_one_reader, pipe_one_writer = Pipe() + pipe_two_reader, pipe_two_writer = Pipe() + + inputs = [ + pipe_one_reader, pipe_two_reader + ] + + pipe_one_writer.send(1) + readables = wait(inputs) + + self.assertIn(pipe_one_reader, readables) + self.assertEqual(len(readables), 1) + msg = readables[0].recv() + self.assertEqual(msg, 1) + + pipe_one_writer.send(1) + pipe_two_writer.send(2) + readables = wait(inputs) + + self.assertIn(pipe_one_reader, readables) + self.assertIn(pipe_two_reader, readables) + self.assertEqual(len(readables), 2) + for readable in readables: + if readable == pipe_one_reader: + msg = readable.recv() + self.assertEqual(msg, 1) + elif readable == pipe_two_reader: + msg = readable.recv() + self.assertEqual(msg, 2) + + # Test that wait can wait on multiple queues + def testWaitQueues(self)->None: + queue_one = Queue() + queue_two = Queue() + + inputs = [ + queue_one, queue_two + ] + + queue_one.put(1) + readables = wait(inputs) + + self.assertIn(queue_one, readables) + self.assertEqual(len(readables), 1) + msg = readables[0].get() + self.assertEqual(msg, 1) + + queue_one.put(1) + queue_two.put(2) + sleep(0.1) + readables = wait(inputs) + + self.assertIn(queue_one, readables) + self.assertIn(queue_two, readables) + self.assertEqual(len(readables), 2) + for readable in readables: + if readable == queue_one: + msg = readable.get() + self.assertEqual(msg, 1) + elif readable == queue_two: + msg = readable.get() + self.assertEqual(msg, 2) + + # Test that wait can wait on multiple pipes and queues + def testWaitPipesAndQueues(self)->None: + pipe_one_reader, pipe_one_writer = Pipe() + pipe_two_reader, pipe_two_writer = Pipe() + queue_one = Queue() + queue_two = Queue() + + inputs = [ + pipe_one_reader, pipe_two_reader, queue_one, queue_two + ] + + pipe_one_writer.send(1) + readables = wait(inputs) + + self.assertIn(pipe_one_reader, readables) + self.assertEqual(len(readables), 1) + msg = readables[0].recv() + self.assertEqual(msg, 1) + + pipe_one_writer.send(1) + pipe_two_writer.send(2) + readables = wait(inputs) + + self.assertIn(pipe_one_reader, readables) + self.assertIn(pipe_two_reader, readables) + self.assertEqual(len(readables), 2) + for readable in readables: + if readable == pipe_one_reader: + msg = readable.recv() + self.assertEqual(msg, 1) + if readable == pipe_two_reader: + msg = readable.recv() + self.assertEqual(msg, 2) + + queue_one.put(1) + readables = wait(inputs) + + self.assertIn(queue_one, readables) + self.assertEqual(len(readables), 1) + msg = readables[0].get() + self.assertEqual(msg, 1) + + queue_one.put(1) + queue_two.put(2) + sleep(0.1) + readables = wait(inputs) + + self.assertIn(queue_one, readables) + self.assertIn(queue_two, readables) + self.assertEqual(len(readables), 2) + for readable in readables: + if readable == queue_one: + msg = readable.get() + self.assertEqual(msg, 1) + elif readable == queue_two: + msg = readable.get() + self.assertEqual(msg, 2) + + queue_one.put(1) + pipe_one_writer.send(1) + sleep(0.1) + readables = wait(inputs) + + self.assertIn(queue_one, readables) + self.assertIn(pipe_one_reader, readables) + self.assertEqual(len(readables), 2) + for readable in readables: + if readable == queue_one: + msg = readable.get() + self.assertEqual(msg, 1) + elif readable == pipe_one_reader: + msg = readable.recv() + self.assertEqual(msg, 1)