diff --git a/core/functionality.py b/core/functionality.py index 3e1c204..1e2d128 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -9,7 +9,7 @@ import yaml from datetime import datetime from typing import List -from multiprocessing.connection import Connection, wait as multi_wait +from multiprocessing.connection import Connection, PipeConnection, wait as multi_wait from multiprocessing.queues import Queue from papermill.translators import papermill_translators from typing import Any, Dict @@ -50,13 +50,12 @@ def generate_id(prefix:str="", length:int=16, existing_ids:List[str]=[], def wait(inputs:List[VALID_CHANNELS])->List[VALID_CHANNELS]: all_connections = [i for i in inputs if type(i) is Connection] \ + + [i for i in inputs if type(i) is PipeConnection] \ + [i._reader for i in inputs if type(i) is Queue] - for i in inputs: - print(type(i)) - ready = multi_wait(all_connections) ready_inputs = [i for i in inputs if \ (type(i) is Connection and i in ready) \ + or (type(i) is PipeConnection and i in ready) \ or (type(i) is Queue and i._reader in ready)] return ready_inputs diff --git a/tests/test_runner.py b/tests/test_runner.py index 8c9a79a..196b59c 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -409,13 +409,19 @@ class MeowTests(unittest.TestCase): # Test meow python job chaining within runner def testMeowRunnerLinkedPythonExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", os.path.join("start", "A.txt"), "recipe_one", "infile", + "pattern_one", + os.path.join("start", "A.txt"), + "recipe_one", + "infile", parameters={ "num":250, "outfile":os.path.join("{VGRID}", "middle", "{FILENAME}") }) pattern_two = FileEventPattern( - "pattern_two", os.path.join("middle", "A.txt"), "recipe_one", "infile", + "pattern_two", + os.path.join("middle", "A.txt"), + "recipe_one", + "infile", parameters={ "num":40, "outfile":os.path.join("{VGRID}", "output", "{FILENAME}")