diff --git a/core/functionality.py b/core/functionality.py index 98af242..e264866 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -8,7 +8,7 @@ import yaml from datetime import datetime -from multiprocessing.connection import Connection, wait as multi_wait +from multiprocessing.connection import Connection, PipeConnection, wait as multi_wait from multiprocessing.queues import Queue from papermill.translators import papermill_translators from typing import Any @@ -49,12 +49,19 @@ def generate_id(prefix:str="", length:int=16, existing_ids:list[str]=[], def wait(inputs:list[VALID_CHANNELS])->list[VALID_CHANNELS]: all_connections = [i for i in inputs if type(i) is Connection] \ + + [i for i in inputs if type(i) is PipeConnection] \ + [i._reader for i in inputs if type(i) is Queue] - + print(inputs) + for i in inputs: + print(type(i)) + print(all_connections) ready = multi_wait(all_connections) + print(ready) ready_inputs = [i for i in inputs if \ (type(i) is Connection and i in ready) \ + or (type(i) is PipeConnection and i in ready) \ or (type(i) is Queue and i._reader in ready)] + print(ready_inputs) return ready_inputs def _get_file_sha256(file_path): diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 26b5500..49aa03c 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -76,25 +76,38 @@ class CorrectnessTests(unittest.TestCase): pipe_one_writer.send(1) readables = wait(inputs) +# raise EnvironmentError + +# if pipe_one_reader.poll(3): +# print(pipe_one_reader.recv()) + self.assertIn(pipe_one_reader, readables) + print("assert1") self.assertEqual(len(readables), 1) + print("assert2") msg = readables[0].recv() self.assertEqual(msg, 1) + print("assert3") pipe_one_writer.send(1) pipe_two_writer.send(2) readables = wait(inputs) self.assertIn(pipe_one_reader, readables) + print("assert4") self.assertIn(pipe_two_reader, readables) + print("assert5") self.assertEqual(len(readables), 2) + print("assert6") for readable in readables: if readable == pipe_one_reader: msg = readable.recv() self.assertEqual(msg, 1) + print("assertloop if") elif readable == pipe_two_reader: msg = readable.recv() self.assertEqual(msg, 2) + print("assert loop elif") # Test that wait can wait on multiple queues def testWaitQueues(self)->None: diff --git a/tests/test_runner.py b/tests/test_runner.py index e5fd26a..0d8f28e 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -121,10 +121,10 @@ class MeowTests(unittest.TestCase): # Test single meow papermill job execution def testMeowRunnerPapermillExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "extra":"A line from a test Pattern", - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = JupyterNotebookRecipe( "recipe_one", APPENDING_NOTEBOOK) @@ -205,16 +205,16 @@ class MeowTests(unittest.TestCase): # Test meow papermill job chaining within runner def testMeowRunnerLinkedPapermillExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "extra":"A line from Pattern 1", - "outfile":"{VGRID}/middle/{FILENAME}" + "outfile":os.path.join("{VGRID}","middle","{FILENAME}") }) pattern_two = FileEventPattern( - "pattern_two", "middle/A.txt", "recipe_one", "infile", + "pattern_two", os.path.join("middle","A.txt"), "recipe_one", "infile", parameters={ "extra":"A line from Pattern 2", - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = JupyterNotebookRecipe( "recipe_one", APPENDING_NOTEBOOK) @@ -314,10 +314,10 @@ class MeowTests(unittest.TestCase): # Test single meow python job execution def testMeowRunnerPythonExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "num":10000, - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = PythonRecipe( "recipe_one", COMPLETE_PYTHON_SCRIPT @@ -403,16 +403,16 @@ class MeowTests(unittest.TestCase): # Test meow python job chaining within runner def testMeowRunnerLinkedPythonExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "num":250, - "outfile":"{VGRID}/middle/{FILENAME}" + "outfile":os.path.join("{VGRID}","middle","{FILENAME}") }) pattern_two = FileEventPattern( - "pattern_two", "middle/A.txt", "recipe_one", "infile", + "pattern_two", os.path.join("middle","A.txt"), "recipe_one", "infile", parameters={ "num":40, - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = PythonRecipe( "recipe_one", COMPLETE_PYTHON_SCRIPT