Added Pipeconnection for Windows and os.path.join, rather than hardcoded path combine

This commit is contained in:
toppggg
2023-02-08 14:32:16 +01:00
parent d787e37adc
commit 37d061b06a
3 changed files with 34 additions and 14 deletions

View File

@ -8,7 +8,7 @@ import yaml
from datetime import datetime from datetime import datetime
from multiprocessing.connection import Connection, wait as multi_wait from multiprocessing.connection import Connection, PipeConnection, wait as multi_wait
from multiprocessing.queues import Queue from multiprocessing.queues import Queue
from papermill.translators import papermill_translators from papermill.translators import papermill_translators
from typing import Any from typing import Any
@ -49,12 +49,19 @@ def generate_id(prefix:str="", length:int=16, existing_ids:list[str]=[],
def wait(inputs:list[VALID_CHANNELS])->list[VALID_CHANNELS]: def wait(inputs:list[VALID_CHANNELS])->list[VALID_CHANNELS]:
all_connections = [i for i in inputs if type(i) is Connection] \ all_connections = [i for i in inputs if type(i) is Connection] \
+ [i for i in inputs if type(i) is PipeConnection] \
+ [i._reader for i in inputs if type(i) is Queue] + [i._reader for i in inputs if type(i) is Queue]
print(inputs)
for i in inputs:
print(type(i))
print(all_connections)
ready = multi_wait(all_connections) ready = multi_wait(all_connections)
print(ready)
ready_inputs = [i for i in inputs if \ ready_inputs = [i for i in inputs if \
(type(i) is Connection and i in ready) \ (type(i) is Connection and i in ready) \
or (type(i) is PipeConnection and i in ready) \
or (type(i) is Queue and i._reader in ready)] or (type(i) is Queue and i._reader in ready)]
print(ready_inputs)
return ready_inputs return ready_inputs
def _get_file_sha256(file_path): def _get_file_sha256(file_path):

View File

@ -76,25 +76,38 @@ class CorrectnessTests(unittest.TestCase):
pipe_one_writer.send(1) pipe_one_writer.send(1)
readables = wait(inputs) readables = wait(inputs)
# raise EnvironmentError
# if pipe_one_reader.poll(3):
# print(pipe_one_reader.recv())
self.assertIn(pipe_one_reader, readables) self.assertIn(pipe_one_reader, readables)
print("assert1")
self.assertEqual(len(readables), 1) self.assertEqual(len(readables), 1)
print("assert2")
msg = readables[0].recv() msg = readables[0].recv()
self.assertEqual(msg, 1) self.assertEqual(msg, 1)
print("assert3")
pipe_one_writer.send(1) pipe_one_writer.send(1)
pipe_two_writer.send(2) pipe_two_writer.send(2)
readables = wait(inputs) readables = wait(inputs)
self.assertIn(pipe_one_reader, readables) self.assertIn(pipe_one_reader, readables)
print("assert4")
self.assertIn(pipe_two_reader, readables) self.assertIn(pipe_two_reader, readables)
print("assert5")
self.assertEqual(len(readables), 2) self.assertEqual(len(readables), 2)
print("assert6")
for readable in readables: for readable in readables:
if readable == pipe_one_reader: if readable == pipe_one_reader:
msg = readable.recv() msg = readable.recv()
self.assertEqual(msg, 1) self.assertEqual(msg, 1)
print("assertloop if")
elif readable == pipe_two_reader: elif readable == pipe_two_reader:
msg = readable.recv() msg = readable.recv()
self.assertEqual(msg, 2) self.assertEqual(msg, 2)
print("assert loop elif")
# Test that wait can wait on multiple queues # Test that wait can wait on multiple queues
def testWaitQueues(self)->None: def testWaitQueues(self)->None:

View File

@ -121,10 +121,10 @@ class MeowTests(unittest.TestCase):
# Test single meow papermill job execution # Test single meow papermill job execution
def testMeowRunnerPapermillExecution(self)->None: def testMeowRunnerPapermillExecution(self)->None:
pattern_one = FileEventPattern( pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile", "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile",
parameters={ parameters={
"extra":"A line from a test Pattern", "extra":"A line from a test Pattern",
"outfile":"{VGRID}/output/{FILENAME}" "outfile":os.path.join("{VGRID}","output","{FILENAME}")
}) })
recipe = JupyterNotebookRecipe( recipe = JupyterNotebookRecipe(
"recipe_one", APPENDING_NOTEBOOK) "recipe_one", APPENDING_NOTEBOOK)
@ -205,16 +205,16 @@ class MeowTests(unittest.TestCase):
# Test meow papermill job chaining within runner # Test meow papermill job chaining within runner
def testMeowRunnerLinkedPapermillExecution(self)->None: def testMeowRunnerLinkedPapermillExecution(self)->None:
pattern_one = FileEventPattern( pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile", "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile",
parameters={ parameters={
"extra":"A line from Pattern 1", "extra":"A line from Pattern 1",
"outfile":"{VGRID}/middle/{FILENAME}" "outfile":os.path.join("{VGRID}","middle","{FILENAME}")
}) })
pattern_two = FileEventPattern( pattern_two = FileEventPattern(
"pattern_two", "middle/A.txt", "recipe_one", "infile", "pattern_two", os.path.join("middle","A.txt"), "recipe_one", "infile",
parameters={ parameters={
"extra":"A line from Pattern 2", "extra":"A line from Pattern 2",
"outfile":"{VGRID}/output/{FILENAME}" "outfile":os.path.join("{VGRID}","output","{FILENAME}")
}) })
recipe = JupyterNotebookRecipe( recipe = JupyterNotebookRecipe(
"recipe_one", APPENDING_NOTEBOOK) "recipe_one", APPENDING_NOTEBOOK)
@ -314,10 +314,10 @@ class MeowTests(unittest.TestCase):
# Test single meow python job execution # Test single meow python job execution
def testMeowRunnerPythonExecution(self)->None: def testMeowRunnerPythonExecution(self)->None:
pattern_one = FileEventPattern( pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile", "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile",
parameters={ parameters={
"num":10000, "num":10000,
"outfile":"{VGRID}/output/{FILENAME}" "outfile":os.path.join("{VGRID}","output","{FILENAME}")
}) })
recipe = PythonRecipe( recipe = PythonRecipe(
"recipe_one", COMPLETE_PYTHON_SCRIPT "recipe_one", COMPLETE_PYTHON_SCRIPT
@ -403,16 +403,16 @@ class MeowTests(unittest.TestCase):
# Test meow python job chaining within runner # Test meow python job chaining within runner
def testMeowRunnerLinkedPythonExecution(self)->None: def testMeowRunnerLinkedPythonExecution(self)->None:
pattern_one = FileEventPattern( pattern_one = FileEventPattern(
"pattern_one", "start/A.txt", "recipe_one", "infile", "pattern_one", os.path.join("start","A.txt"), "recipe_one", "infile",
parameters={ parameters={
"num":250, "num":250,
"outfile":"{VGRID}/middle/{FILENAME}" "outfile":os.path.join("{VGRID}","middle","{FILENAME}")
}) })
pattern_two = FileEventPattern( pattern_two = FileEventPattern(
"pattern_two", "middle/A.txt", "recipe_one", "infile", "pattern_two", os.path.join("middle","A.txt"), "recipe_one", "infile",
parameters={ parameters={
"num":40, "num":40,
"outfile":"{VGRID}/output/{FILENAME}" "outfile":os.path.join("{VGRID}","output","{FILENAME}")
}) })
recipe = PythonRecipe( recipe = PythonRecipe(
"recipe_one", COMPLETE_PYTHON_SCRIPT "recipe_one", COMPLETE_PYTHON_SCRIPT