diff --git a/core/functionality.py b/core/functionality.py index 98af242..e784b4b 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -8,7 +8,7 @@ import yaml from datetime import datetime -from multiprocessing.connection import Connection, wait as multi_wait +from multiprocessing.connection import Connection, PipeConnection, wait as multi_wait from multiprocessing.queues import Queue from papermill.translators import papermill_translators from typing import Any @@ -49,11 +49,12 @@ def generate_id(prefix:str="", length:int=16, existing_ids:list[str]=[], def wait(inputs:list[VALID_CHANNELS])->list[VALID_CHANNELS]: all_connections = [i for i in inputs if type(i) is Connection] \ + + [i for i in inputs if type(i) is PipeConnection] \ + [i._reader for i in inputs if type(i) is Queue] - ready = multi_wait(all_connections) ready_inputs = [i for i in inputs if \ (type(i) is Connection and i in ready) \ + or (type(i) is PipeConnection and i in ready) \ or (type(i) is Queue and i._reader in ready)] return ready_inputs diff --git a/tests/test_runner.py b/tests/test_runner.py index e5fd26a..0d8f28e 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -121,10 +121,10 @@ class MeowTests(unittest.TestCase): # Test single meow papermill job execution def testMeowRunnerPapermillExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "extra":"A line from a test Pattern", - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = JupyterNotebookRecipe( "recipe_one", APPENDING_NOTEBOOK) @@ -205,16 +205,16 @@ class MeowTests(unittest.TestCase): # Test meow papermill job chaining within runner def testMeowRunnerLinkedPapermillExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "extra":"A line from Pattern 1", - "outfile":"{VGRID}/middle/{FILENAME}" + "outfile":os.path.join("{VGRID}","middle","{FILENAME}") }) pattern_two = FileEventPattern( - "pattern_two", "middle/A.txt", "recipe_one", "infile", + "pattern_two", os.path.join("middle","A.txt"), "recipe_one", "infile", parameters={ "extra":"A line from Pattern 2", - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = JupyterNotebookRecipe( "recipe_one", APPENDING_NOTEBOOK) @@ -314,10 +314,10 @@ class MeowTests(unittest.TestCase): # Test single meow python job execution def testMeowRunnerPythonExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "num":10000, - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = PythonRecipe( "recipe_one", COMPLETE_PYTHON_SCRIPT @@ -403,16 +403,16 @@ class MeowTests(unittest.TestCase): # Test meow python job chaining within runner def testMeowRunnerLinkedPythonExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "num":250, - "outfile":"{VGRID}/middle/{FILENAME}" + "outfile":os.path.join("{VGRID}","middle","{FILENAME}") }) pattern_two = FileEventPattern( - "pattern_two", "middle/A.txt", "recipe_one", "infile", + "pattern_two", os.path.join("middle","A.txt"), "recipe_one", "infile", parameters={ "num":40, - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = PythonRecipe( "recipe_one", COMPLETE_PYTHON_SCRIPT