From 37d061b06a9577b1fbf41c1b87b3fe4c391c8a46 Mon Sep 17 00:00:00 2001 From: toppggg <70447358+toppggg@users.noreply.github.com> Date: Wed, 8 Feb 2023 14:32:16 +0100 Subject: [PATCH 1/4] Added Pipeconnection for Windows and os.path.join, rather than hardcoded path combine --- core/functionality.py | 11 +++++++++-- tests/test_functionality.py | 13 +++++++++++++ tests/test_runner.py | 24 ++++++++++++------------ 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/core/functionality.py b/core/functionality.py index 98af242..e264866 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -8,7 +8,7 @@ import yaml from datetime import datetime -from multiprocessing.connection import Connection, wait as multi_wait +from multiprocessing.connection import Connection, PipeConnection, wait as multi_wait from multiprocessing.queues import Queue from papermill.translators import papermill_translators from typing import Any @@ -49,12 +49,19 @@ def generate_id(prefix:str="", length:int=16, existing_ids:list[str]=[], def wait(inputs:list[VALID_CHANNELS])->list[VALID_CHANNELS]: all_connections = [i for i in inputs if type(i) is Connection] \ + + [i for i in inputs if type(i) is PipeConnection] \ + [i._reader for i in inputs if type(i) is Queue] - + print(inputs) + for i in inputs: + print(type(i)) + print(all_connections) ready = multi_wait(all_connections) + print(ready) ready_inputs = [i for i in inputs if \ (type(i) is Connection and i in ready) \ + or (type(i) is PipeConnection and i in ready) \ or (type(i) is Queue and i._reader in ready)] + print(ready_inputs) return ready_inputs def _get_file_sha256(file_path): diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 26b5500..49aa03c 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -76,25 +76,38 @@ class CorrectnessTests(unittest.TestCase): pipe_one_writer.send(1) readables = wait(inputs) +# raise EnvironmentError + +# if pipe_one_reader.poll(3): +# print(pipe_one_reader.recv()) + self.assertIn(pipe_one_reader, readables) + print("assert1") self.assertEqual(len(readables), 1) + print("assert2") msg = readables[0].recv() self.assertEqual(msg, 1) + print("assert3") pipe_one_writer.send(1) pipe_two_writer.send(2) readables = wait(inputs) self.assertIn(pipe_one_reader, readables) + print("assert4") self.assertIn(pipe_two_reader, readables) + print("assert5") self.assertEqual(len(readables), 2) + print("assert6") for readable in readables: if readable == pipe_one_reader: msg = readable.recv() self.assertEqual(msg, 1) + print("assertloop if") elif readable == pipe_two_reader: msg = readable.recv() self.assertEqual(msg, 2) + print("assert loop elif") # Test that wait can wait on multiple queues def testWaitQueues(self)->None: diff --git a/tests/test_runner.py b/tests/test_runner.py index e5fd26a..0d8f28e 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -121,10 +121,10 @@ class MeowTests(unittest.TestCase): # Test single meow papermill job execution def testMeowRunnerPapermillExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "extra":"A line from a test Pattern", - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = JupyterNotebookRecipe( "recipe_one", APPENDING_NOTEBOOK) @@ -205,16 +205,16 @@ class MeowTests(unittest.TestCase): # Test meow papermill job chaining within runner def testMeowRunnerLinkedPapermillExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "extra":"A line from Pattern 1", - "outfile":"{VGRID}/middle/{FILENAME}" + "outfile":os.path.join("{VGRID}","middle","{FILENAME}") }) pattern_two = FileEventPattern( - "pattern_two", "middle/A.txt", "recipe_one", "infile", + "pattern_two", os.path.join("middle","A.txt"), "recipe_one", "infile", parameters={ "extra":"A line from Pattern 2", - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = JupyterNotebookRecipe( "recipe_one", APPENDING_NOTEBOOK) @@ -314,10 +314,10 @@ class MeowTests(unittest.TestCase): # Test single meow python job execution def testMeowRunnerPythonExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "num":10000, - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = PythonRecipe( "recipe_one", COMPLETE_PYTHON_SCRIPT @@ -403,16 +403,16 @@ class MeowTests(unittest.TestCase): # Test meow python job chaining within runner def testMeowRunnerLinkedPythonExecution(self)->None: pattern_one = FileEventPattern( - "pattern_one", "start/A.txt", "recipe_one", "infile", + "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile", parameters={ "num":250, - "outfile":"{VGRID}/middle/{FILENAME}" + "outfile":os.path.join("{VGRID}","middle","{FILENAME}") }) pattern_two = FileEventPattern( - "pattern_two", "middle/A.txt", "recipe_one", "infile", + "pattern_two", os.path.join("middle","A.txt"), "recipe_one", "infile", parameters={ "num":40, - "outfile":"{VGRID}/output/{FILENAME}" + "outfile":os.path.join("{VGRID}","output","{FILENAME}") }) recipe = PythonRecipe( "recipe_one", COMPLETE_PYTHON_SCRIPT From 2899d44a005a41079b142402e2f24ea61c30e59c Mon Sep 17 00:00:00 2001 From: toppggg <70447358+toppggg@users.noreply.github.com> Date: Wed, 8 Feb 2023 15:03:44 +0100 Subject: [PATCH 2/4] Deleted print statements --- tests/test_functionality.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 49aa03c..43b361f 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -76,38 +76,24 @@ class CorrectnessTests(unittest.TestCase): pipe_one_writer.send(1) readables = wait(inputs) -# raise EnvironmentError - -# if pipe_one_reader.poll(3): -# print(pipe_one_reader.recv()) - self.assertIn(pipe_one_reader, readables) - print("assert1") self.assertEqual(len(readables), 1) - print("assert2") msg = readables[0].recv() self.assertEqual(msg, 1) - print("assert3") pipe_one_writer.send(1) pipe_two_writer.send(2) readables = wait(inputs) self.assertIn(pipe_one_reader, readables) - print("assert4") - self.assertIn(pipe_two_reader, readables) - print("assert5") self.assertEqual(len(readables), 2) - print("assert6") for readable in readables: if readable == pipe_one_reader: msg = readable.recv() self.assertEqual(msg, 1) - print("assertloop if") elif readable == pipe_two_reader: msg = readable.recv() self.assertEqual(msg, 2) - print("assert loop elif") # Test that wait can wait on multiple queues def testWaitQueues(self)->None: From 1e1b142849c5018ca6a21ac52df067877cc35491 Mon Sep 17 00:00:00 2001 From: toppggg <70447358+toppggg@users.noreply.github.com> Date: Wed, 8 Feb 2023 15:05:33 +0100 Subject: [PATCH 3/4] Deleted print statements --- core/functionality.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/functionality.py b/core/functionality.py index e264866..e784b4b 100644 --- a/core/functionality.py +++ b/core/functionality.py @@ -51,17 +51,11 @@ def wait(inputs:list[VALID_CHANNELS])->list[VALID_CHANNELS]: all_connections = [i for i in inputs if type(i) is Connection] \ + [i for i in inputs if type(i) is PipeConnection] \ + [i._reader for i in inputs if type(i) is Queue] - print(inputs) - for i in inputs: - print(type(i)) - print(all_connections) ready = multi_wait(all_connections) - print(ready) ready_inputs = [i for i in inputs if \ (type(i) is Connection and i in ready) \ or (type(i) is PipeConnection and i in ready) \ or (type(i) is Queue and i._reader in ready)] - print(ready_inputs) return ready_inputs def _get_file_sha256(file_path): From ca10fd6dc759ecb72ccd3816bf03663a19784861 Mon Sep 17 00:00:00 2001 From: toppggg <70447358+toppggg@users.noreply.github.com> Date: Wed, 8 Feb 2023 15:26:06 +0100 Subject: [PATCH 4/4] Added line 89 assert again --- tests/test_functionality.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_functionality.py b/tests/test_functionality.py index 43b361f..26b5500 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -86,6 +86,7 @@ class CorrectnessTests(unittest.TestCase): readables = wait(inputs) self.assertIn(pipe_one_reader, readables) + self.assertIn(pipe_two_reader, readables) self.assertEqual(len(readables), 2) for readable in readables: if readable == pipe_one_reader: