diff --git a/.gitignore b/.gitignore index b3ec55e..0ff3dfe 100644 --- a/.gitignore +++ b/.gitignore @@ -53,6 +53,14 @@ coverage.xml tests/test_monitor_base tests/test_job_queue_dir tests/test_job_output +tests/test_files +tests/test_data +tests/job_output +tests/job_queue +tests/Backup* + +# hdf5 +*.h5 # Translations *.mo diff --git a/core/runner.py b/core/runner.py index e69def4..38d3cf5 100644 --- a/core/runner.py +++ b/core/runner.py @@ -18,7 +18,8 @@ from core.base_conductor import BaseConductor from core.base_handler import BaseHandler from core.base_monitor import BaseMonitor from core.correctness.vars import DEBUG_WARNING, DEBUG_INFO, EVENT_TYPE, \ - VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, DEFAULT_JOB_QUEUE_DIR + VALID_CHANNELS, META_FILE, DEFAULT_JOB_OUTPUT_DIR, DEFAULT_JOB_QUEUE_DIR, \ + EVENT_PATH from core.correctness.validation import check_type, valid_list, valid_dir_path from functionality.debug import setup_debugging, print_debug from functionality.file_io import make_dir, read_yaml @@ -50,7 +51,6 @@ class MeowRunner: handlers and conductors according to what events and jobs they produce or consume.""" - self._is_valid_job_queue_dir(job_queue_dir) self._is_valid_job_output_dir(job_output_dir) @@ -200,16 +200,17 @@ class MeowRunner: """Function for a given handler to handle a given event, without crashing the runner in the event of a problem.""" print_debug(self._print_target, self.debug_level, - f"Starting handling for event: '{event[EVENT_TYPE]}'", DEBUG_INFO) + f"Starting handling for {event[EVENT_TYPE]} event: " + f"'{event[EVENT_PATH]}'", DEBUG_INFO) try: handler.handle(event) print_debug(self._print_target, self.debug_level, - f"Completed handling for event: '{event[EVENT_TYPE]}'", - DEBUG_INFO) + f"Completed handling for {event[EVENT_TYPE]} event: " + f"'{event[EVENT_PATH]}'", DEBUG_INFO) except Exception as e: print_debug(self._print_target, self.debug_level, - "Something went wrong during handling for event " - f"'{event[EVENT_TYPE]}'. {e}", DEBUG_INFO) + f"Something went wrong during handling for {event[EVENT_TYPE]}" + f" event '{event[EVENT_PATH]}'. {e}", DEBUG_INFO) def execute_job(self, conductor:BaseConductor, job_dir:str)->None: """Function for a given conductor to execute a given job, without diff --git a/functionality/meow.py b/functionality/meow.py index f0e0ad8..f6c2657 100644 --- a/functionality/meow.py +++ b/functionality/meow.py @@ -26,7 +26,7 @@ KEYWORD_DIR = "{DIR}" KEYWORD_REL_DIR = "{REL_DIR}" KEYWORD_FILENAME = "{FILENAME}" KEYWORD_PREFIX = "{PREFIX}" -KEYWORD_BASE = "{VGRID}" +KEYWORD_BASE = "{BASE}" KEYWORD_EXTENSION = "{EXTENSION}" KEYWORD_JOB = "{JOB}" diff --git a/patterns/file_event_pattern.py b/patterns/file_event_pattern.py index 65f5d39..a635ae6 100644 --- a/patterns/file_event_pattern.py +++ b/patterns/file_event_pattern.py @@ -27,7 +27,7 @@ from core.correctness.validation import check_type, valid_string, \ from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \ VALID_VARIABLE_NAME_CHARS, FILE_EVENTS, FILE_CREATE_EVENT, \ FILE_MODIFY_EVENT, FILE_MOVED_EVENT, DEBUG_INFO, \ - FILE_RETROACTIVE_EVENT, SHA256 + FILE_RETROACTIVE_EVENT, SHA256, VALID_PATH_CHARS, FILE_CLOSED_EVENT from functionality.debug import setup_debugging, print_debug from functionality.hashing import get_file_hash from functionality.meow import create_rule, create_watchdog_event @@ -37,7 +37,8 @@ _DEFAULT_MASK = [ FILE_CREATE_EVENT, FILE_MODIFY_EVENT, FILE_MOVED_EVENT, - FILE_RETROACTIVE_EVENT + FILE_RETROACTIVE_EVENT, + FILE_CLOSED_EVENT ] class FileEventPattern(BasePattern): @@ -64,7 +65,7 @@ class FileEventPattern(BasePattern): def _is_valid_triggering_path(self, triggering_path:str)->None: """Validation check for 'triggering_path' variable from main constructor.""" - valid_path(triggering_path) + valid_string(triggering_path, VALID_PATH_CHARS+'*', min_length=1) if len(triggering_path) < 1: raise ValueError ( f"triggiering path '{triggering_path}' is too short. " @@ -170,8 +171,9 @@ class WatchdogMonitor(BaseMonitor): def match(self, event)->None: """Function to determine if a given event matches the current rules.""" src_path = event.src_path - event_type = "dir_"+ event.event_type if event.is_directory \ - else "file_" + event.event_type + + prepend = "dir_" if event.is_directory else "file_" + event_types = [prepend+i for i in event.event_type] # Remove the base dir from the path as trigger paths are given relative # to that @@ -185,7 +187,8 @@ class WatchdogMonitor(BaseMonitor): for rule in self._rules.values(): # Skip events not within the event mask - if event_type not in rule.pattern.event_mask: + if any(i in event_types for i in rule.pattern.event_mask) \ + != True: continue # Use regex to match event paths against rule paths @@ -205,8 +208,8 @@ class WatchdogMonitor(BaseMonitor): get_file_hash(event.src_path, SHA256) ) print_debug(self._print_target, self.debug_level, - f"Event at {src_path} of type {event_type} hit rule " - f"{rule.name}", DEBUG_INFO) + f"Event at {src_path} hit rule {rule.name}", + DEBUG_INFO) # Send the event to the runner self.to_runner.send(meow_event) @@ -543,32 +546,63 @@ class WatchdogEventHandler(PatternMatchingEventHandler): monitor. After each event we wait for '_settletime', to catch subsequent events at the same location, so as to not swamp the system with repeated events.""" + self._recent_jobs_lock.acquire() try: - if event.src_path in self._recent_jobs: - recent_timestamp = self._recent_jobs[event.src_path] - difference = event.time_stamp - recent_timestamp - - # Discard the event if we already have a recent event at this - # same path. Update the most recent time, so we can hopefully - # wait till events have stopped happening - if difference <= self._settletime: - self._recent_jobs[event.src_path] = \ - max(recent_timestamp, event.time_stamp) + if event.src_path in self._recent_jobs: + if event.time_stamp > self._recent_jobs[event.src_path][0]: + self._recent_jobs[event.src_path][0] = event.time_stamp + self._recent_jobs[event.src_path][1].add(event.event_type) + else: self._recent_jobs_lock.release() return - else: - self._recent_jobs[event.src_path] = event.time_stamp else: - self._recent_jobs[event.src_path] = event.time_stamp + self._recent_jobs[event.src_path] = \ + [event.time_stamp, {event.event_type}] except Exception as ex: self._recent_jobs_lock.release() raise Exception(ex) self._recent_jobs_lock.release() - # If we did not have a recent event, then send it on to the monitor + sleep(self._settletime) + + self._recent_jobs_lock.acquire() + try: + if event.src_path in self._recent_jobs \ + and event.time_stamp < self._recent_jobs[event.src_path][0]: + self._recent_jobs_lock.release() + return + except Exception as ex: + self._recent_jobs_lock.release() + raise Exception(ex) + event.event_type = self._recent_jobs[event.src_path][1] + self._recent_jobs_lock.release() + self.monitor.match(event) +# recent_timestamp = self._recent_jobs[event.src_path] +# difference = event.time_stamp - recent_timestamp +# +# # Discard the event if we already have a recent event at this +# # same path. Update the most recent time, so we can hopefully +# # wait till events have stopped happening +# if difference <= self._settletime: +# self._recent_jobs[event.src_path] = \ +# max(recent_timestamp, event.time_stamp) +# self._recent_jobs_lock.release() +# return +# else: +# self._recent_jobs[event.src_path] = event.time_stamp +# else: +# self._recent_jobs[event.src_path] = event.time_stamp +# except Exception as ex: +# self._recent_jobs_lock.release() +# raise Exception(ex) +# self._recent_jobs_lock.release() +# +# # If we did not have a recent event, then send it on to the monitor +# self.monitor.match(event) + def handle_event(self, event): """Handler function, called by all specific event functions. Will attach a timestamp to the event immediately, and attempt to start a diff --git a/requirements.txt b/requirements.txt index e3ede20..f32df44 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ pytest papermill nbformat pyyaml +watchdog diff --git a/tests/shared.py b/tests/shared.py index 67054e1..eeddaa5 100644 --- a/tests/shared.py +++ b/tests/shared.py @@ -5,6 +5,8 @@ Author(s): David Marchant """ import os +from distutils.dir_util import copy_tree + from core.correctness.vars import DEFAULT_JOB_OUTPUT_DIR, DEFAULT_JOB_QUEUE_DIR from functionality.file_io import make_dir, rmtree from patterns import FileEventPattern @@ -15,7 +17,7 @@ TEST_DIR = "test_files" TEST_MONITOR_BASE = "test_monitor_base" TEST_JOB_QUEUE = "test_job_queue_dir" TEST_JOB_OUTPUT = "test_job_output" - +TEST_DATA = "test_data" def setup(): make_dir(TEST_DIR, ensure_clean=True) @@ -33,6 +35,15 @@ def teardown(): rmtree(DEFAULT_JOB_OUTPUT_DIR) rmtree(DEFAULT_JOB_QUEUE_DIR) rmtree("first") + if os.path.exists("temp_phantom_info.h5"): + os.remove("temp_phantom_info.h5") + if os.path.exists("temp_phantom.h5"): + os.remove("temp_phantom.h5") + +def backup_before_teardown(backup_source:str, backup_dest:str): + make_dir(backup_dest, ensure_clean=True) + copy_tree(backup_source, backup_dest) + # Recipe funcs BAREBONES_PYTHON_SCRIPT = [ @@ -304,6 +315,1052 @@ ADDING_NOTEBOOK = { "nbformat": 4, "nbformat_minor": 4 } +FILTER_RECIPE = { + "cells": [ + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Variables to be overridden\n", + "input_image = 'Patch.jpg'\n", + "output_image = 'Blurred_Patch.jpg'\n", + "args = {}\n", + "method = 'BLUR'" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Import statements\n", + "from PIL import Image, ImageFilter\n", + "import yaml\n", + "import os" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Read in image to apply filter to\n", + "im = Image.open(input_image)\n", + "im" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Dynamically construct the filter command as a string from provided arguments\n", + "exec_str = 'im.filter(ImageFilter.%s' % method\n", + "args_str = ', '.join(\"{!s}={!r}\".format(key,val) for (key,val) in args.items())\n", + "exec_str += '(' + args_str + '))'\n", + "exec_str" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Apply constructed command as python code\n", + "filtered = eval(exec_str)\n", + "filtered" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Create output directory if it doesn't exist\n", + "output_dir_path = os.path.dirname(output_image)\n", + "\n", + "if output_dir_path:\n", + " os.makedirs(output_dir_path, exist_ok=True)" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Save output image\n", + "filtered = filtered.save(output_image)" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.9" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} +MAKER_RECIPE = { + "cells": [ + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Variables to be overridden\n", + "meow_dir = 'meow_directory'\n", + "filter_recipe = 'recipe_filter'\n", + "input_yaml = 'input.yml'\n", + "workgroup = '{BASE}'\n", + "workflows_url = 'https://test-sid.idmc.dk/cgi-sid/jsoninterface.py?output_format=json'\n", + "workflows_session_id = '*redacted*'\n", + "\n", + "# Names of the variables in filter_recipe.ipynb\n", + "recipe_input_image = 'input_image'\n", + "recipe_output_image = 'output_image'\n", + "recipe_args = 'args'\n", + "recipe_method = 'method'" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Imports\n", + "import yaml\n", + "import mig_meow as meow\n", + "import os" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Setup environment variables for meow to workgroup communication\n", + "os.environ['WORKFLOWS_URL'] = workflows_url\n", + "os.environ['WORKFLOWS_SESSION_ID'] = workflows_session_id" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Read in configuration data\n", + "with open(input_yaml, 'r') as yaml_file:\n", + " y = yaml.full_load(yaml_file)" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Assemble a name for the new Pattern\n", + "name_str = '%s_%s' % (\n", + " y['filter'], '_'.join(\"{!s}_{!r}\".format(key,val) for (key,val) in y['args'].items()))" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Create the new Pattern\n", + "new_pattern = meow.Pattern(name_str)\n", + "new_pattern.add_recipe(filter_recipe)\n", + "new_pattern.add_single_input(recipe_input_image, y['input_path'])\n", + "new_pattern.add_output(recipe_output_image, y['output_path'])\n", + "new_pattern.add_variable(recipe_method, y['filter'])\n", + "new_pattern.add_variable(recipe_args, y['args'])" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Register the new Pattern with the system.\n", + "meow.export_pattern_to_vgrid(workgroup, new_pattern)" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.9" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} +POROSITY_CHECK_NOTEBOOK = { + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Variables that will be overwritten accoring to pattern:" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + f"input_filename = 'foam_ct_data{os.path.sep}foam_016_ideal_CT.npy'\n", + "output_filedir_accepted = 'foam_ct_data_accepted' \n", + "output_filedir_discarded = 'foam_ct_data_discarded'\n", + "porosity_lower_threshold = 0.8\n", + "utils_path = 'idmc_utils_module.py'" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "import importlib\n", + "import matplotlib.pyplot as plt\n", + "import os\n", + "\n", + "import importlib.util\n", + "spec = importlib.util.spec_from_file_location(\"utils\", utils_path)\n", + "utils = importlib.util.module_from_spec(spec)\n", + "spec.loader.exec_module(utils)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Parameters" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "n_samples = 10000" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "#Load data\n", + "ct_data = np.load(input_filename)" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "utils.plot_center_slices(ct_data)" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "sample_inds=np.random.randint(0, len(ct_data.ravel()), n_samples)\n", + "n_components=2\n", + "#Perform GMM fitting on samples from dataset\n", + "means, stds, weights = utils.perform_GMM_np(\n", + " ct_data.ravel()[sample_inds], \n", + " n_components, \n", + " plot=True, \n", + " title='GMM fitted to '+str(n_samples)+' of '\n", + " +str(len(ct_data.ravel()))+' datapoints')\n", + "print('weights: ', weights)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Classify data as 'accepted' or 'dircarded' according to porosity level\n", + "\n", + "Text file named according to the dataset will be stored in appropriate directories" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + f"filename_withouth_npy=input_filename.split('{os.path.sep}')[-1].split('.')[0]\n", + "\n", + "if np.max(weights)>porosity_lower_threshold:\n", + " os.makedirs(output_filedir_accepted, exist_ok=True)\n", + " acc_path = os.path.join(output_filedir_accepted, \n", + " filename_withouth_npy+'.txt')\n", + " with open(acc_path, 'w') as file:\n", + " file.write(str(np.max(weights))+' '+str(np.min(weights)))\n", + "else:\n", + " os.makedirs(output_filedir_discarded, exist_ok=True)\n", + " dis_path = os.path.join(output_filedir_discarded, \n", + " filename_withouth_npy+'.txt') \n", + " with open(dis_path, 'w') as file:\n", + " file.write(str(np.max(weights))+' '+str(np.min(weights)))" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.9" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} +SEGMENT_FOAM_NOTEBOOK = { + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Variables that will be overwritten accoring to pattern:" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + f"input_filename = 'foam_ct_data_accepted{os.path.sep}foam_016_ideal_CT.txt'\n", + "input_filedir = 'foam_ct_data'\n", + "output_filedir = 'foam_ct_data_segmented'\n", + "utils_path = 'idmc_utils_module.py'" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import importlib\n", + "import matplotlib.pyplot as plt\n", + "import os\n", + "import scipy.ndimage as snd\n", + "import skimage\n", + "\n", + "import importlib.util\n", + "spec = importlib.util.spec_from_file_location(\"utils\", utils_path)\n", + "utils = importlib.util.module_from_spec(spec)\n", + "spec.loader.exec_module(utils)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Segmentation\n", + "\n", + "Segmentation method used:\n", + "\n", + "- Median filter applied to reduce noise\n", + "- Otsu thresholding applied to get binary data\n", + "- Morphological closing performed to remove remaining single-voxel noise\n" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# importlib.reload(utils)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Parameters" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "median_filter_kernel_size = 2" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load data" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "filename_withouth_txt=input_filename.split(os.path.sep)[-1].split('.')[0]\n", + "input_data = os.path.join(input_filedir, filename_withouth_txt+'.npy')\n", + "\n", + "ct_data = np.load(input_data)\n", + "utils.plot_center_slices(ct_data, title = filename_withouth_txt)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Median filtering " + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "data_filtered = snd.median_filter(ct_data, median_filter_kernel_size)\n", + "utils.plot_center_slices(data_filtered, title = filename_withouth_txt+' median filtered')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Otsu thresholding" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "threshold = skimage.filters.threshold_otsu(data_filtered)\n", + "data_thresholded = (data_filtered>threshold)*1\n", + "utils.plot_center_slices(data_thresholded, title = filename_withouth_txt+' Otsu thresholded')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Morphological closing" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "data_segmented = (skimage.morphology.binary_closing((data_thresholded==0))==0)\n", + "utils.plot_center_slices(data_segmented, title = filename_withouth_txt+' Otsu thresholded')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Save data" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "filename_save = filename_withouth_txt+'_segmented.npy'\n", + "os.makedirs(output_filedir, exist_ok=True)\n", + "np.save(os.path.join(output_filedir, filename_save), data_segmented)" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.9" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} +FOAM_PORE_ANALYSIS_NOTEBOOK = { + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Variables that will be overwritten accoring to pattern:" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + f"input_filename = 'foam_ct_data_segmented{os.path.sep}foam_016_ideal_CT_segmented.npy'\n", + "output_filedir = 'foam_ct_data_pore_analysis'\n", + "utils_path = 'idmc_utils_module.py'" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import importlib\n", + "import matplotlib.pyplot as plt\n", + "import os\n", + "import scipy.ndimage as snd\n", + "\n", + "from skimage.segmentation import watershed\n", + "from skimage.feature import peak_local_max\n", + "from matplotlib import cm\n", + "from matplotlib.colors import ListedColormap, LinearSegmentedColormap\n", + "\n", + "import importlib.util\n", + "spec = importlib.util.spec_from_file_location(\"utils\", utils_path)\n", + "utils = importlib.util.module_from_spec(spec)\n", + "spec.loader.exec_module(utils)" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# importlib.reload(utils)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Foam pore analysis\n", + "\n", + "- Use Watershed algorithm to separate pores\n", + "- Plot statistics\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load data" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "data = np.load(input_filename)" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "utils.plot_center_slices(data, title = input_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Watershed: Identify separate pores " + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "#distance map\n", + "distance = snd.distance_transform_edt((data==0))\n", + "\n", + "#get watershed seeds\n", + "local_maxi = peak_local_max(distance, indices=False, footprint=np.ones((3, 3, 3)), labels=(data==0))\n", + "markers = snd.label(local_maxi)[0]\n", + "\n", + "#perform watershed pore seapration\n", + "labels = watershed(-distance, markers, mask=(data==0))\n", + "\n", + "## Pore color mad\n", + "somecmap = cm.get_cmap('magma', 256)\n", + "cvals=np.random.uniform(0, 1, len(np.unique(labels)))\n", + "newcmp = ListedColormap(somecmap(cvals))\n", + "\n", + "\n", + "utils.plot_center_slices(-distance, cmap=plt.cm.gray, title='Distances')\n", + "utils.plot_center_slices(labels, cmap=newcmp, title='Separated pores')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Plot statistics: pore radii" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "volumes = np.array([np.sum(labels==label) for label in np.unique(labels)])\n", + "volumes.sort()\n", + "#ignore two largest labels (background and matrix)\n", + "radii = (volumes[:-2]*3/(4*np.pi))**(1/3) #find radii, assuming spherical pores\n", + "_=plt.hist(radii, bins=200)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Save plot" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "filename_withouth_npy=input_filename.split(os.path.sep)[-1].split('.')[0]\n", + "filename_save = filename_withouth_npy+'_statistics.png'\n", + "\n", + "fig, ax = plt.subplots(1,3, figsize=(15,4))\n", + "ax[0].imshow(labels[:,:,np.shape(labels)[2]//2], cmap=newcmp)\n", + "ax[1].imshow(labels[:,np.shape(labels)[2]//2,:], cmap=newcmp)\n", + "_=ax[2].hist(radii, bins=200)\n", + "ax[2].set_title('Foam pore radii')\n", + "\n", + "os.makedirs(output_filedir, exist_ok=True)\n", + "plt.savefig(os.path.join(output_filedir, filename_save))" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.9" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} +GENERATOR_NOTEBOOK = { + "cells": [ + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# importing the necessary modules\n", + "import numpy as np\n", + "import random\n", + "import os\n", + "import shutil\n", + "import importlib.util" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Variables to be overridden\n", + "dest_dir = 'foam_ct_data'\n", + "discarded = os.path.join('discarded', 'foam_data_0-big-.npy')\n", + "utils_path = 'idmc_utils_module.py'\n", + "gen_path = 'shared.py'\n", + "test_data = 'test_data'" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# import loaded modules\n", + "u_spec = importlib.util.spec_from_file_location(\"utils\", utils_path)\n", + "utils = importlib.util.module_from_spec(u_spec)\n", + "u_spec.loader.exec_module(utils)\n", + "\n", + "g_spec = importlib.util.spec_from_file_location(\"gen\", gen_path)\n", + "gen = importlib.util.module_from_spec(g_spec)\n", + "g_spec.loader.exec_module(gen)" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "# Other variables, will be kept constant\n", + "_, _, i, val, vx, vy, vz = os.path.basename(discarded).split('_')\n", + "vz.replace(\".npy\", \"\")\n", + "i = int(i)\n", + "val = int(val)\n", + "vx = int(vx)\n", + "vy = int(vy)\n", + "vz = int(vz)\n", + "res=3/vz\n", + "\n", + "chance_good=1\n", + "chance_small=0\n", + "chance_big=0\n", + "\n", + "nspheres_per_unit_few=100\n", + "nspheres_per_unit_ideal=1000\n", + "nspheres_per_unit_many=10000" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "possible_selection = [nspheres_per_unit_ideal] * chance_good \\\n", + " + [nspheres_per_unit_few] * chance_big \\\n", + " + [nspheres_per_unit_many] * chance_small\n", + "random.shuffle(possible_selection)\n", + "selection = possible_selection[0]" + ] + }, + { + "cell_type": "code", + "execution_count": None, + "metadata": {}, + "outputs": [], + "source": [ + "filename = f\"foam_dataset_{i}_{selection}_{vx}_{vy}_{vz}.npy\"\n", + "backup_file = os.path.join(test_data, filename)\n", + "if not os.path.exists(backup_file):\n", + " gen.create_foam_data_file(backup_file, selection, vx, vy, vz, res)\n", + "target_file = os.path.join(dest_dir, filename)\n", + "shutil.copy(backup_file, target_file)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.9" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} + +# Python scripts +IDMC_UTILS_MODULE = [ + "import matplotlib.pyplot as plt", + "from sklearn import mixture", + "import numpy as np", + "from skimage.morphology import convex_hull_image", + "", + "def saveplot(figpath_and_name, dataset):", + "", + " fig, ax=plt.subplots(1, 3, figsize=(10, 4))", + " ax[0].imshow(dataset[dataset.shape[0]//2,:,:])", + " ax[1].imshow(dataset[:,dataset.shape[1]//2, :])", + " ax[2].imshow(dataset[:,:,dataset.shape[2]//2])", + " plt.savefig(figpath_and_name)", + "", + "", + "def slice_by_slice_mask_calc(data):", + " '''calculate mask from convex hull of data, slice by slice in x-direction'''", + "", + " mask=np.zeros(data.shape)", + " no_slices=data.shape[0]", + " for i in range(no_slices):", + " xslice=data[i,:,:]", + " mask[i,:,:]=convex_hull_image(xslice)", + " return mask", + "", + "", + "def plot_center_slices(volume, title='', fig_external=[],figsize=(15,5), cmap='viridis', colorbar=False, vmin=None, vmax=None):", + " shape=np.shape(volume)", + "", + " if len(fig_external)==0:", + " fig,ax = plt.subplots(1,3, figsize=figsize)", + " else:", + " fig = fig_external[0]", + " ax = fig_external[1]", + "", + " fig.suptitle(title)", + " im=ax[0].imshow(volume[:,:, int(shape[2]/2)], cmap=cmap, vmin=vmin, vmax=vmax)", + " ax[0].set_title('Center z slice')", + " ax[1].imshow(volume[:,int(shape[1]/2),:], cmap=cmap, vmin=vmin, vmax=vmax)", + " ax[1].set_title('Center y slice')", + " ax[2].imshow(volume[int(shape[0]/2),:,:], cmap=cmap, vmin=vmin, vmax=vmax)", + " ax[2].set_title('Center x slice')", + "", + " if colorbar:", + " fig.subplots_adjust(right=0.8)", + " cbar_ax = fig.add_axes([0.85, 0.15, 0.05, 0.7])", + " fig.colorbar(im, cax=cbar_ax)", + "", + "", + "def perform_GMM_np(data_np, n_components, plot=False, n_init=1, nbins=500, title='', fig_external=[], return_labels=False):", + "", + " #reshape data", + " n_samples=len(data_np)", + " X_train = np.concatenate([data_np.reshape((n_samples, 1)), np.zeros((n_samples, 1))], axis=1)", + "", + " # fit a Gaussian Mixture Model", + " clf = mixture.GaussianMixture(n_components=n_components, covariance_type='full', n_init=n_init)", + " clf.fit(X_train)", + " if clf.converged_!=True:", + " print(' !! Did not converge! Converged: ',clf.converged_)", + "", + " labels=clf.predict(X_train)", + "", + " means=[]", + " stds=[]", + " weights=[]", + " for c in range(n_components):", + " component=X_train[labels==c][:,0]", + " means.append(np.mean(component))", + " stds.append(np.std(component))", + " weights.append(len(component)/len(data_np))", + "", + " if plot:", + " gaussian = lambda x, mu, s, A: A*np.exp(-0.5*(x-mu)**2/s**2)/np.sqrt(2*np.pi*s**2)", + "", + " if len(fig_external)>0:", + " fig, ax=fig_external[0], fig_external[1]", + " else:", + " fig, ax=plt.subplots(1, figsize=(16, 8))", + "", + " hist, bin_edges = np.histogram(data_np, bins=nbins)", + " bin_size=np.diff(bin_edges)", + " bin_centers = bin_edges[:-1] + bin_size/ 2", + " hist_normed = hist/(n_samples*bin_size) #normalizing to get 1 under graph", + " ax.bar(bin_centers,hist_normed, bin_size, alpha=0.5)", + " if len(title)>0:", + " ax.set_title(title)", + " else:", + " ax.set_title('Histogram, '+str(n_samples)+' datapoints. ')", + "", + " #COLORMAP WITH EVENLY SPACED COLORS!", + " colors=plt.cm.rainbow(np.linspace(0,1,n_components+1))#rainbow, plasma, autumn, viridis...", + "", + " x_vals=np.linspace(np.min(bin_edges), np.max(bin_edges), 500)", + "", + " g_total=np.zeros_like(x_vals)", + " for c in range(n_components):", + " gc=gaussian(x_vals, means[c], stds[c], weights[c])", + " ax.plot(x_vals, gc, color=colors[c], linewidth=2, label='mean=%.2f'%(means[c]))", + " ax.arrow(means[c], weights[c], 0, 0.1)", + " g_total+=gc", + " ax.plot(x_vals, g_total, color=colors[-1], linewidth=2, label='Total Model')", + " plt.legend()", + "", + " if return_labels:", + " return means, stds, weights, labels", + " else:", + " return means, stds, weights" +] +GENERATE_SCRIPT = [ + "import numpy as np", + "import random", + "import foam_ct_phantom.foam_ct_phantom as foam_ct_phantom", + "", + "def generate_foam(nspheres_per_unit, vx, vy, vz, res):", + " def maxsize_func(x, y, z):", + " return 0.2 - 0.1*np.abs(z)", + "", + " random_seed=random.randint(0,4294967295)", + " foam_ct_phantom.FoamPhantom.generate('temp_phantom_info.h5',", + " random_seed,", + " nspheres_per_unit=nspheres_per_unit,", + " maxsize=maxsize_func)", + "", + " geom = foam_ct_phantom.VolumeGeometry(vx, vy, vz, res)", + " phantom = foam_ct_phantom.FoamPhantom('temp_phantom_info.h5')", + " phantom.generate_volume('temp_phantom.h5', geom)", + " dataset = foam_ct_phantom.load_volume('temp_phantom.h5')", + "", + " return dataset", + "", + "def create_foam_data_file(filename:str, val:int, vx:int, vy:int, vz:int, res:int):", + " dataset = generate_foam(val, vx, vy, vz, res)", + " np.save(filename, dataset)", + " del dataset" +] + valid_pattern_one = FileEventPattern( "pattern_one", "path_one", "recipe_one", "file_one") diff --git a/tests/test_all.sh b/tests/test_all.sh index a00a020..0eb30f4 100755 --- a/tests/test_all.sh +++ b/tests/test_all.sh @@ -13,7 +13,7 @@ cd $script_dir search_dir=. for entry in "$search_dir"/* do - if [[ $entry == ./test* ]] && [[ $entry != ./$script_name ]] && [[ $entry != ./shared.py ]]; + if [[ $entry == ./test* ]] && [[ -f $entry ]] && [[ $entry != ./$script_name ]] && [[ $entry != ./shared.py ]]; then pytest $entry "-W ignore::DeprecationWarning" fi diff --git a/tests/test_conductors.py b/tests/test_conductors.py index 3e992d0..bded7ee 100644 --- a/tests/test_conductors.py +++ b/tests/test_conductors.py @@ -406,7 +406,7 @@ class MeowTests(unittest.TestCase): error = read_file(error_file) self.assertEqual(error, "Recieved incorrectly setup job.\n\n[Errno 2] No such file or " - f"directory: 'test_job_queue_dir/{job_dict[JOB_ID]}/job.yml'") + f"directory: 'test_job_queue_dir{os.path.sep}{job_dict[JOB_ID]}{os.path.sep}job.yml'") # Test LocalPythonConductor does not execute jobs with bad functions def testLocalPythonConductorBadFunc(self)->None: diff --git a/tests/test_runner.py b/tests/test_runner.py index c3e4d91..51724cb 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -1,8 +1,11 @@ import io +import importlib import os import unittest +from random import shuffle +from shutil import copy from time import sleep from core.base_conductor import BaseConductor @@ -12,17 +15,107 @@ from conductors import LocalPythonConductor from core.correctness.vars import get_result_file, \ JOB_TYPE_PAPERMILL, JOB_ERROR, META_FILE, JOB_TYPE_PYTHON, JOB_CREATE_TIME from core.runner import MeowRunner -from functionality.file_io import make_dir, read_file, read_notebook, read_yaml +from functionality.file_io import make_dir, read_file, read_notebook, \ + read_yaml, write_file, lines_to_string from functionality.meow import create_parameter_sweep +from functionality.requirements import create_python_requirements from patterns.file_event_pattern import WatchdogMonitor, FileEventPattern from recipes.jupyter_notebook_recipe import PapermillHandler, \ JupyterNotebookRecipe from recipes.python_recipe import PythonHandler, PythonRecipe -from shared import setup, teardown, \ - TEST_JOB_QUEUE, TEST_JOB_OUTPUT, TEST_MONITOR_BASE, \ - APPENDING_NOTEBOOK, COMPLETE_PYTHON_SCRIPT, TEST_DIR +from shared import setup, teardown, backup_before_teardown, \ + TEST_JOB_QUEUE, TEST_JOB_OUTPUT, TEST_MONITOR_BASE, MAKER_RECIPE, \ + APPENDING_NOTEBOOK, COMPLETE_PYTHON_SCRIPT, TEST_DIR, FILTER_RECIPE, \ + POROSITY_CHECK_NOTEBOOK, SEGMENT_FOAM_NOTEBOOK, GENERATOR_NOTEBOOK, \ + FOAM_PORE_ANALYSIS_NOTEBOOK, IDMC_UTILS_MODULE, TEST_DATA, GENERATE_SCRIPT +pattern_check = FileEventPattern( + "pattern_check", + os.path.join("foam_ct_data", "*"), + "recipe_check", + "input_filename", + parameters={ + "output_filedir_accepted": + os.path.join("{BASE}", "foam_ct_data_accepted"), + "output_filedir_discarded": + os.path.join("{BASE}", "foam_ct_data_discarded"), + "porosity_lower_threshold": 0.8, + "utils_path": os.path.join("{BASE}", "idmc_utils_module.py") + }) + +pattern_segment = FileEventPattern( + "pattern_segment", + os.path.join("foam_ct_data_accepted", "*"), + "recipe_segment", + "input_filename", + parameters={ + "output_filedir": os.path.join("{BASE}", "foam_ct_data_segmented"), + "input_filedir": os.path.join("{BASE}", "foam_ct_data"), + "utils_path": os.path.join("{BASE}", "idmc_utils_module.py") + }) + +pattern_analysis = FileEventPattern( + "pattern_analysis", + os.path.join("foam_ct_data_segmented", "*"), + "recipe_analysis", + "input_filename", + parameters={ + "output_filedir": os.path.join("{BASE}", "foam_ct_data_pore_analysis"), + "utils_path": os.path.join("{BASE}", "idmc_utils_module.py") + }) + +pattern_regenerate = FileEventPattern( + "pattern_regenerate", + os.path.join("foam_ct_data_discarded", "*"), + "recipe_generator", + "discarded", + parameters={ + "dest_dir": os.path.join("{BASE}", "foam_ct_data"), + "utils_path": os.path.join("{BASE}", "idmc_utils_module.py"), + "gen_path": os.path.join("{BASE}", "generator.py"), + "test_data": os.path.join(TEST_DATA, "foam_ct_data"), + "vx": 64, + "vy": 64, + "vz": 64, + "res": 3/64, + "chance_good": 1, + "chance_small": 0, + "chance_big": 0 + }) + +recipe_check_key, recipe_check_req = create_python_requirements( + modules=["numpy", "importlib", "matplotlib"]) +recipe_check = JupyterNotebookRecipe( + 'recipe_check', + POROSITY_CHECK_NOTEBOOK, + requirements={recipe_check_key: recipe_check_req} +) + +recipe_segment_key, recipe_segment_req = create_python_requirements( + modules=["numpy", "importlib", "matplotlib", "scipy", "skimage"]) +recipe_segment = JupyterNotebookRecipe( + 'recipe_segment', + SEGMENT_FOAM_NOTEBOOK, + requirements={recipe_segment_key: recipe_segment_req} +) + +recipe_analysis_key, recipe_analysis_req = create_python_requirements( + modules=["numpy", "importlib", "matplotlib", "scipy", "skimage"]) +recipe_analysis = JupyterNotebookRecipe( + 'recipe_analysis', + FOAM_PORE_ANALYSIS_NOTEBOOK, + requirements={recipe_analysis_key: recipe_analysis_req} +) + +recipe_generator_key, recipe_generator_req = create_python_requirements( + modules=["numpy", "matplotlib", "random"]) +recipe_generator = JupyterNotebookRecipe( + 'recipe_generator', + GENERATOR_NOTEBOOK, + requirements={recipe_generator_key: recipe_generator_req} +) + class MeowTests(unittest.TestCase): def setUp(self)->None: super().setUp() @@ -171,7 +264,7 @@ class MeowTests(unittest.TestCase): "infile", parameters={ "extra":"A line from a test Pattern", - "outfile":os.path.join("{VGRID}", "output", "{FILENAME}") + "outfile":os.path.join("{BASE}", "output", "{FILENAME}") }) recipe = JupyterNotebookRecipe( "recipe_one", APPENDING_NOTEBOOK) @@ -257,13 +350,13 @@ class MeowTests(unittest.TestCase): "infile", parameters={ "extra":"A line from Pattern 1", - "outfile":os.path.join("{VGRID}", "middle", "{FILENAME}") + "outfile":os.path.join("{BASE}", "middle", "{FILENAME}") }) pattern_two = FileEventPattern( "pattern_two", os.path.join("middle", "A.txt"), "recipe_one", "infile", parameters={ "extra":"A line from Pattern 2", - "outfile":os.path.join("{VGRID}", "output", "{FILENAME}") + "outfile":os.path.join("{BASE}", "output", "{FILENAME}") }) recipe = JupyterNotebookRecipe( "recipe_one", APPENDING_NOTEBOOK) @@ -367,7 +460,7 @@ class MeowTests(unittest.TestCase): "pattern_one", os.path.join("start", "A.txt"), "recipe_one", "infile", parameters={ "num":10000, - "outfile":os.path.join("{VGRID}", "output", "{FILENAME}") + "outfile":os.path.join("{BASE}", "output", "{FILENAME}") }) recipe = PythonRecipe( "recipe_one", COMPLETE_PYTHON_SCRIPT @@ -459,7 +552,7 @@ class MeowTests(unittest.TestCase): "infile", parameters={ "num":250, - "outfile":os.path.join("{VGRID}", "middle", "{FILENAME}") + "outfile":os.path.join("{BASE}", "middle", "{FILENAME}") }) pattern_two = FileEventPattern( "pattern_two", @@ -468,7 +561,7 @@ class MeowTests(unittest.TestCase): "infile", parameters={ "num":40, - "outfile":os.path.join("{VGRID}", "output", "{FILENAME}") + "outfile":os.path.join("{BASE}", "output", "{FILENAME}") }) recipe = PythonRecipe( "recipe_one", COMPLETE_PYTHON_SCRIPT @@ -595,7 +688,7 @@ class MeowTests(unittest.TestCase): "infile", sweep=create_parameter_sweep("num", 1000, 10000, 200), parameters={ - "outfile":os.path.join("{VGRID}", "output", "{FILENAME}") + "outfile":os.path.join("{BASE}", "output", "{FILENAME}") }) recipe = PythonRecipe( "recipe_one", COMPLETE_PYTHON_SCRIPT @@ -678,7 +771,608 @@ class MeowTests(unittest.TestCase): output_path = os.path.join(TEST_MONITOR_BASE, "output", "A.txt") self.assertTrue(os.path.exists(output_path)) - # TODO adding tests with numpy or other external dependency + def testSelfModifyingAnalysis(self)->None: + maker_pattern = FileEventPattern( + "maker_pattern", + os.path.join("confs", "*.yml"), + "maker_recipe", + "input_yaml", + parameters={ + "meow_dir": "self-modifying", + "filter_recipe": "recipe_filter", + "recipe_input_image": "input_image", + "recipe_output_image": "output_image", + "recipe_args": "args", + "recipe_method": "method" + }) + patterns = { + "maker_pattern": maker_pattern, + } + + filter_recipe = JupyterNotebookRecipe( + "filter_recipe", FILTER_RECIPE + ) + maker_recipe = JupyterNotebookRecipe( + "maker_recipe", MAKER_RECIPE + ) + + recipes = { + filter_recipe.name: filter_recipe, + maker_recipe.name: maker_recipe + } + + runner_debug_stream = io.StringIO("") + + runner = MeowRunner( + WatchdogMonitor( + TEST_MONITOR_BASE, + patterns, + recipes, + settletime=1 + ), + PythonHandler( + job_queue_dir=TEST_JOB_QUEUE + ), + LocalPythonConductor(), + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT, + print=runner_debug_stream, + logging=3 + ) + + # TODO finish me +# runner.start() + + # Test some actual scientific analysis, but in a simple progression + def testScientificAnalysisAllGood(self)->None: + patterns = { + 'pattern_check': pattern_check, + 'pattern_segment': pattern_segment, + 'pattern_analysis': pattern_analysis, + 'pattern_regenerate': pattern_regenerate + } + + recipes = { + 'recipe_check': recipe_check, + 'recipe_segment': recipe_segment, + 'recipe_analysis': recipe_analysis, + 'recipe_generator': recipe_generator + } + + runner_debug_stream = io.StringIO("") + + runner = MeowRunner( + WatchdogMonitor( + TEST_MONITOR_BASE, + patterns, + recipes, + settletime=1 + ), + PapermillHandler( + job_queue_dir=TEST_JOB_QUEUE + ), + LocalPythonConductor(), + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT, + print=runner_debug_stream, + logging=3 + ) + + good = 3 + big = 0 + small = 0 + vx = 64 + vy = 64 + vz = 64 + res = 3/vz + backup_data_dir = os.path.join(TEST_DATA, "foam_ct_data") + foam_data_dir = os.path.join(TEST_MONITOR_BASE, "foam_ct_data") + make_dir(foam_data_dir) + + write_file(lines_to_string(IDMC_UTILS_MODULE), + os.path.join(TEST_MONITOR_BASE, "idmc_utils_module.py")) + + gen_path = os.path.join(TEST_MONITOR_BASE, "generator.py") + write_file(lines_to_string(GENERATE_SCRIPT), gen_path) + + u_spec = importlib.util.spec_from_file_location("gen", gen_path) + gen = importlib.util.module_from_spec(u_spec) + u_spec.loader.exec_module(gen) + + all_data = [1000] * good + [100] * big + [10000] * small + shuffle(all_data) + + for i, val in enumerate(all_data): + filename = f"foam_dataset_{i}_{val}_{vx}_{vy}_{vz}.npy" + backup_file = os.path.join(backup_data_dir, filename) + if not os.path.exists(backup_file): + gen.create_foam_data_file(backup_file, val, vx, vy, vz, res) + + target_file = os.path.join(foam_data_dir, filename) + copy(backup_file, target_file) + + self.assertEqual(len(os.listdir(foam_data_dir)), good + big + small) + + runner.start() + + idle_loops = 0 + total_loops = 0 + messages = None + while idle_loops < 15 and total_loops < 150: + sleep(1) + runner_debug_stream.seek(0) + new_messages = runner_debug_stream.readlines() + + if messages == new_messages: + idle_loops += 1 + else: + idle_loops = 0 + messages = new_messages + total_loops += 1 + + for message in messages: + print(message.replace('\n', '')) + + runner.stop() + + print(f"total_loops:{total_loops}, idle_loops:{idle_loops}") + + if len(os.listdir(TEST_JOB_OUTPUT)) != good * 3: + backup_before_teardown(TEST_JOB_OUTPUT, + f"Backup-all_good-{TEST_JOB_OUTPUT}") + backup_before_teardown(TEST_JOB_QUEUE, + f"Backup-all_good-{TEST_JOB_QUEUE}") + backup_before_teardown(TEST_MONITOR_BASE, + f"Backup-all_good-{TEST_MONITOR_BASE}") + self.assertEqual(len(os.listdir(TEST_JOB_OUTPUT)), good * 3) + for job_dir in os.listdir(TEST_JOB_OUTPUT): + metafile = os.path.join(TEST_JOB_OUTPUT, job_dir, META_FILE) + status = read_yaml(metafile) + + if JOB_ERROR in status: + backup_before_teardown(TEST_JOB_OUTPUT, + f"Backup-all_good-{TEST_JOB_OUTPUT}") + backup_before_teardown(TEST_JOB_QUEUE, + f"Backup-all_good-{TEST_JOB_QUEUE}") + backup_before_teardown(TEST_MONITOR_BASE, + f"Backup-all_good-{TEST_MONITOR_BASE}") + + self.assertNotIn(JOB_ERROR, status) + + result_path = os.path.join( + TEST_JOB_OUTPUT, job_dir, get_result_file(JOB_TYPE_PAPERMILL)) + self.assertTrue(os.path.exists(result_path)) + + # Test some actual scientific analysis, in a predicatable loop + def testScientificAnalysisPredictableLoop(self)->None: + patterns = { + 'pattern_check': pattern_check, + 'pattern_segment': pattern_segment, + 'pattern_analysis': pattern_analysis, + 'pattern_regenerate': pattern_regenerate + } + + recipes = { + 'recipe_check': recipe_check, + 'recipe_segment': recipe_segment, + 'recipe_analysis': recipe_analysis, + 'recipe_generator': recipe_generator + } + + runner_debug_stream = io.StringIO("") + + runner = MeowRunner( + WatchdogMonitor( + TEST_MONITOR_BASE, + patterns, + recipes, + settletime=1 + ), + PapermillHandler( + job_queue_dir=TEST_JOB_QUEUE + ), + LocalPythonConductor(), + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT, + print=runner_debug_stream, + logging=3 + ) + + good = 10 + big = 5 + small = 0 + vx = 64 + vy = 64 + vz = 64 + res = 3/vz + backup_data_dir = os.path.join(TEST_DATA, "foam_ct_data") + make_dir(backup_data_dir) + foam_data_dir = os.path.join(TEST_MONITOR_BASE, "foam_ct_data") + make_dir(foam_data_dir) + + write_file(lines_to_string(IDMC_UTILS_MODULE), + os.path.join(TEST_MONITOR_BASE, "idmc_utils_module.py")) + + gen_path = os.path.join(TEST_MONITOR_BASE, "generator.py") + write_file(lines_to_string(GENERATE_SCRIPT), gen_path) + + all_data = [1000] * good + [100] * big + [10000] * small + shuffle(all_data) + + u_spec = importlib.util.spec_from_file_location("gen", gen_path) + gen = importlib.util.module_from_spec(u_spec) + u_spec.loader.exec_module(gen) + + for i, val in enumerate(all_data): + filename = f"foam_dataset_{i}_{val}_{vx}_{vy}_{vz}.npy" + backup_file = os.path.join(backup_data_dir, filename) + if not os.path.exists(backup_file): + gen.create_foam_data_file(backup_file, val, vx, vy, vz, res) + + target_file = os.path.join(foam_data_dir, filename) + copy(backup_file, target_file) + + self.assertEqual(len(os.listdir(foam_data_dir)), good + big + small) + + runner.start() + + idle_loops = 0 + total_loops = 0 + messages = None + while idle_loops < 45 and total_loops < 600: + sleep(1) + runner_debug_stream.seek(0) + new_messages = runner_debug_stream.readlines() + + if messages == new_messages: + idle_loops += 1 + else: + idle_loops = 0 + messages = new_messages + total_loops += 1 + + for message in messages: + print(message.replace('\n', '')) + + runner.stop() + print(f"total_loops:{total_loops}, idle_loops:{idle_loops}") + + jobs = len(os.listdir(TEST_JOB_OUTPUT)) + if jobs != (good*3 + big*5 + small*5): + backup_before_teardown(TEST_JOB_OUTPUT, + f"Backup-predictable-{TEST_JOB_OUTPUT}") + backup_before_teardown(TEST_JOB_QUEUE, + f"Backup-predictable-{TEST_JOB_QUEUE}") + backup_before_teardown(TEST_MONITOR_BASE, + f"Backup-predictable-{TEST_MONITOR_BASE}") + + self.assertEqual(jobs, good*3 + big*5 + small*5) + for job_dir in os.listdir(TEST_JOB_OUTPUT): + metafile = os.path.join(TEST_JOB_OUTPUT, job_dir, META_FILE) + status = read_yaml(metafile) + + if JOB_ERROR in status: + print(status[JOB_ERROR]) + backup_before_teardown(TEST_JOB_OUTPUT, + f"Backup-predictable-{TEST_JOB_OUTPUT}") + backup_before_teardown(TEST_JOB_QUEUE, + f"Backup-predictable-{TEST_JOB_QUEUE}") + backup_before_teardown(TEST_MONITOR_BASE, + f"Backup-predictable-{TEST_MONITOR_BASE}") + + self.assertNotIn(JOB_ERROR, status) + + result_path = os.path.join( + TEST_JOB_OUTPUT, job_dir, get_result_file(JOB_TYPE_PAPERMILL)) + self.assertTrue(os.path.exists(result_path)) + + results = len(os.listdir( + os.path.join(TEST_MONITOR_BASE, "foam_ct_data_pore_analysis"))) + if results != good+big+small: + backup_before_teardown(TEST_JOB_OUTPUT, + f"Backup-predictable-{TEST_JOB_OUTPUT}") + backup_before_teardown(TEST_JOB_QUEUE, + f"Backup-predictable-{TEST_JOB_QUEUE}") + backup_before_teardown(TEST_MONITOR_BASE, + f"Backup-predictable-{TEST_MONITOR_BASE}") + + self.assertEqual(results, good+big+small) + + # Test some actual scientific analysis, in an unpredicatable loop + def testScientificAnalysisRandomLoop(self)->None: + pattern_regenerate_random = FileEventPattern( + "pattern_regenerate_random", + os.path.join("foam_ct_data_discarded", "*"), + "recipe_generator", + "discarded", + parameters={ + "dest_dir": os.path.join("{BASE}", "foam_ct_data"), + "utils_path": os.path.join("{BASE}", "idmc_utils_module.py"), + "gen_path": os.path.join("{BASE}", "generator.py"), + "test_data": os.path.join(TEST_DATA, "foam_ct_data"), + "vx": 64, + "vy": 64, + "vz": 64, + "res": 3/64, + "chance_good": 1, + "chance_small": 0, + "chance_big": 1 + }) + + patterns = { + 'pattern_check': pattern_check, + 'pattern_segment': pattern_segment, + 'pattern_analysis': pattern_analysis, + 'pattern_regenerate_random': pattern_regenerate_random + } + + recipes = { + 'recipe_check': recipe_check, + 'recipe_segment': recipe_segment, + 'recipe_analysis': recipe_analysis, + 'recipe_generator': recipe_generator + } + + runner_debug_stream = io.StringIO("") + + runner = MeowRunner( + WatchdogMonitor( + TEST_MONITOR_BASE, + patterns, + recipes, + settletime=1 + ), + PapermillHandler( + job_queue_dir=TEST_JOB_QUEUE + ), + LocalPythonConductor(), + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT, + print=runner_debug_stream, + logging=3 + ) + + good = 10 + big = 5 + small = 0 + vx = 64 + vy = 64 + vz = 64 + res = 3/vz + backup_data_dir = os.path.join(TEST_DATA, "foam_ct_data") + make_dir(backup_data_dir) + foam_data_dir = os.path.join(TEST_MONITOR_BASE, "foam_ct_data") + make_dir(foam_data_dir) + + write_file(lines_to_string(IDMC_UTILS_MODULE), + os.path.join(TEST_MONITOR_BASE, "idmc_utils_module.py")) + + gen_path = os.path.join(TEST_MONITOR_BASE, "generator.py") + write_file(lines_to_string(GENERATE_SCRIPT), gen_path) + + all_data = [1000] * good + [100] * big + [10000] * small + shuffle(all_data) + + u_spec = importlib.util.spec_from_file_location("gen", gen_path) + gen = importlib.util.module_from_spec(u_spec) + u_spec.loader.exec_module(gen) + + for i, val in enumerate(all_data): + filename = f"foam_dataset_{i}_{val}_{vx}_{vy}_{vz}.npy" + backup_file = os.path.join(backup_data_dir, filename) + if not os.path.exists(backup_file): + gen.create_foam_data_file(backup_file, val, vx, vy, vz, res) + + target_file = os.path.join(foam_data_dir, filename) + copy(backup_file, target_file) + + self.assertEqual(len(os.listdir(foam_data_dir)), good + big + small) + + runner.start() + + idle_loops = 0 + total_loops = 0 + messages = None + while idle_loops < 60 and total_loops < 600: + sleep(1) + runner_debug_stream.seek(0) + new_messages = runner_debug_stream.readlines() + + if messages == new_messages: + idle_loops += 1 + else: + idle_loops = 0 + messages = new_messages + total_loops += 1 + + for message in messages: + print(message.replace('\n', '')) + + runner.stop() + print(f"total_loops:{total_loops}, idle_loops:{idle_loops}") + + for job_dir in os.listdir(TEST_JOB_OUTPUT): + metafile = os.path.join(TEST_JOB_OUTPUT, job_dir, META_FILE) + status = read_yaml(metafile) + + if JOB_ERROR in status: + print(status[JOB_ERROR]) + backup_before_teardown(TEST_JOB_OUTPUT, + f"Backup-random-{TEST_JOB_OUTPUT}") + backup_before_teardown(TEST_JOB_QUEUE, + f"Backup-random-{TEST_JOB_QUEUE}") + backup_before_teardown(TEST_MONITOR_BASE, + f"Backup-random-{TEST_MONITOR_BASE}") + + self.assertNotIn(JOB_ERROR, status) + + result_path = os.path.join( + TEST_JOB_OUTPUT, job_dir, get_result_file(JOB_TYPE_PAPERMILL)) + self.assertTrue(os.path.exists(result_path)) + + outputs = len(os.listdir(TEST_JOB_OUTPUT)) + if outputs < good*3 + big*5 + small*5: + backup_before_teardown(TEST_JOB_OUTPUT, + f"Backup-random-{TEST_JOB_OUTPUT}") + backup_before_teardown(TEST_JOB_QUEUE, + f"Backup-random-{TEST_JOB_QUEUE}") + backup_before_teardown(TEST_MONITOR_BASE, + f"Backup-random-{TEST_MONITOR_BASE}") + + self.assertTrue(outputs >= good*3 + big*5 + small*5) + + results = len(os.listdir( + os.path.join(TEST_MONITOR_BASE, "foam_ct_data_pore_analysis"))) + + self.assertEqual(results, good+big+small) + + # Test some actual scientific analysis, in an unpredicatable loop + def testScientificAnalysisMassiveRandomLoop(self)->None: + pattern_regenerate_random = FileEventPattern( + "pattern_regenerate_random", + os.path.join("foam_ct_data_discarded", "*"), + "recipe_generator", + "discarded", + parameters={ + "dest_dir": os.path.join("{BASE}", "foam_ct_data"), + "utils_path": os.path.join("{BASE}", "idmc_utils_module.py"), + "gen_path": os.path.join("{BASE}", "generator.py"), + "test_data": os.path.join(TEST_DATA, "foam_ct_data"), + "vx": 32, + "vy": 32, + "vz": 32, + "res": 3/32, + "chance_good": 1, + "chance_small": 0, + "chance_big": 3 + }) + + patterns = { + 'pattern_check': pattern_check, + 'pattern_segment': pattern_segment, + 'pattern_analysis': pattern_analysis, + 'pattern_regenerate_random': pattern_regenerate_random + } + + recipes = { + 'recipe_check': recipe_check, + 'recipe_segment': recipe_segment, + 'recipe_analysis': recipe_analysis, + 'recipe_generator': recipe_generator + } + + runner_debug_stream = io.StringIO("") + + runner = MeowRunner( + WatchdogMonitor( + TEST_MONITOR_BASE, + patterns, + recipes, + settletime=1 + ), + PapermillHandler( + job_queue_dir=TEST_JOB_QUEUE + ), + LocalPythonConductor(), + job_queue_dir=TEST_JOB_QUEUE, + job_output_dir=TEST_JOB_OUTPUT, + print=runner_debug_stream, + logging=3 + ) + + good = 5 + big = 15 + small = 0 + vx = 32 + vy = 32 + vz = 32 + res = 3/vz + backup_data_dir = os.path.join(TEST_DATA, "foam_ct_data") + make_dir(backup_data_dir) + foam_data_dir = os.path.join(TEST_MONITOR_BASE, "foam_ct_data") + make_dir(foam_data_dir) + + write_file(lines_to_string(IDMC_UTILS_MODULE), + os.path.join(TEST_MONITOR_BASE, "idmc_utils_module.py")) + + gen_path = os.path.join(TEST_MONITOR_BASE, "generator.py") + write_file(lines_to_string(GENERATE_SCRIPT), gen_path) + + all_data = [1000] * good + [100] * big + [10000] * small + shuffle(all_data) + + u_spec = importlib.util.spec_from_file_location("gen", gen_path) + gen = importlib.util.module_from_spec(u_spec) + u_spec.loader.exec_module(gen) + + for i, val in enumerate(all_data): + filename = f"foam_dataset_{i}_{val}_{vx}_{vy}_{vz}.npy" + backup_file = os.path.join(backup_data_dir, filename) + if not os.path.exists(backup_file): + gen.create_foam_data_file(backup_file, val, vx, vy, vz, res) + + target_file = os.path.join(foam_data_dir, filename) + copy(backup_file, target_file) + + self.assertEqual(len(os.listdir(foam_data_dir)), good + big + small) + + runner.start() + + idle_loops = 0 + total_loops = 0 + messages = None + while idle_loops < 60 and total_loops < 1200: + sleep(1) + runner_debug_stream.seek(0) + new_messages = runner_debug_stream.readlines() + + if messages == new_messages: + idle_loops += 1 + else: + idle_loops = 0 + messages = new_messages + total_loops += 1 + + for message in messages: + print(message.replace('\n', '')) + + runner.stop() + print(f"total_loops:{total_loops}, idle_loops:{idle_loops}") + + for job_dir in os.listdir(TEST_JOB_OUTPUT): + metafile = os.path.join(TEST_JOB_OUTPUT, job_dir, META_FILE) + status = read_yaml(metafile) + + if JOB_ERROR in status: + print(status[JOB_ERROR]) + backup_before_teardown(TEST_JOB_OUTPUT, + f"Backup-massive-random-{TEST_JOB_OUTPUT}") + backup_before_teardown(TEST_JOB_QUEUE, + f"Backup-massive-random-{TEST_JOB_QUEUE}") + backup_before_teardown(TEST_MONITOR_BASE, + f"Backup-massive-random-{TEST_MONITOR_BASE}") + + self.assertNotIn(JOB_ERROR, status) + + result_path = os.path.join( + TEST_JOB_OUTPUT, job_dir, get_result_file(JOB_TYPE_PAPERMILL)) + self.assertTrue(os.path.exists(result_path)) + + outputs = len(os.listdir(TEST_JOB_OUTPUT)) + if outputs < good*3 + big*5 + small*5: + backup_before_teardown(TEST_JOB_OUTPUT, + f"Backup-massive-random-{TEST_JOB_OUTPUT}") + backup_before_teardown(TEST_JOB_QUEUE, + f"Backup-massive-random-{TEST_JOB_QUEUE}") + backup_before_teardown(TEST_MONITOR_BASE, + f"Backup-massive-random-{TEST_MONITOR_BASE}") + self.assertTrue(outputs >= good*3 + big*5 + small*5) + + results = len(os.listdir( + os.path.join(TEST_MONITOR_BASE, "foam_ct_data_pore_analysis"))) + + self.assertEqual(results, good+big+small) + # TODO test getting job cannot handle # TODO test getting event cannot handle # TODO test with several matched monitors