diff --git a/benchmarking/mrme.py b/benchmarking/mrme.py deleted file mode 100644 index 513af7b..0000000 --- a/benchmarking/mrme.py +++ /dev/null @@ -1,35 +0,0 @@ - -from meow_base.patterns import FileEventPattern -from meow_base.recipes import get_recipe_from_notebook - -from shared import run_test, MRME - -def multiple_rules_multiple_events(job_count:int, REPEATS:int, job_counter:int, - requested_jobs:int, runtime_start:float): - patterns = {} - for i in range(job_count): - pattern = FileEventPattern( - f"pattern_{i}", - f"testing/file_{i}.txt", - "recipe_one", - "input" - ) - patterns[pattern.name] = pattern - - recipe = get_recipe_from_notebook("recipe_one", "test.ipynb") - - recipes = { - recipe.name: recipe - } - - run_test( - patterns, - recipes, - job_count, - job_count, - REPEATS, - job_counter, - requested_jobs, - runtime_start, - signature=MRME - ) diff --git a/benchmarking/mrse.py b/benchmarking/mrse.py deleted file mode 100644 index d6f34fa..0000000 --- a/benchmarking/mrse.py +++ /dev/null @@ -1,35 +0,0 @@ - -from meow_base.patterns import FileEventPattern -from meow_base.recipes import get_recipe_from_notebook - -from shared import run_test, MRSE - -def multiple_rules_single_event(job_count:int, REPEATS:int, job_counter:int, - requested_jobs:int, runtime_start:float): - patterns = {} - for i in range(job_count): - pattern = FileEventPattern( - f"pattern_{i}", - f"testing/*", - "recipe_one", - "input" - ) - patterns[pattern.name] = pattern - - recipe = get_recipe_from_notebook("recipe_one", "test.ipynb") - - recipes = { - recipe.name: recipe - } - - run_test( - patterns, - recipes, - 1, - job_count, - REPEATS, - job_counter, - requested_jobs, - runtime_start, - signature=MRSE - ) diff --git a/benchmarking/run_all.py b/benchmarking/run_all.py deleted file mode 100644 index 3b343f6..0000000 --- a/benchmarking/run_all.py +++ /dev/null @@ -1,201 +0,0 @@ - -import matplotlib.pyplot as pyplot -import numpy -import sys -import time -import os - -from typing import List - -from shared import JOBS_COUNTS, REPEATS, TESTS, MRME, MRSE, SRME, SRSEP, \ - SRSES, RESULTS_DIR, BASE -from mrme import multiple_rules_multiple_events -from mrse import multiple_rules_single_event -from srme import single_rule_multiple_events -from srsep import single_rule_single_event_parallel -from srsps import single_rule_single_event_sequential - -from meow_base.core.correctness.vars import DEFAULT_JOB_OUTPUT_DIR, \ - DEFAULT_JOB_QUEUE_DIR -from meow_base.functionality.file_io import rmtree - -LINE_KEYS = { - SRSES: ('x','#a1467e'), - SRME: ('.','#896cff'), - MRME: ('d','#5983b0'), - MRSE: ('P','#ff6cbe'), - SRSEP: ('*','#3faf46'), -} - - -def run_tests(): - rmtree(RESULTS_DIR) - - requested_jobs=0 - for job_count in JOBS_COUNTS: - requested_jobs += job_count * REPEATS * len(TESTS) - print(f"requested_jobs: {requested_jobs}") - - runtime_start=time.time() - - job_counter=0 - for job_count in JOBS_COUNTS: - for test in TESTS: - if test == MRME: - multiple_rules_multiple_events(job_count, REPEATS, job_counter, requested_jobs, runtime_start) - job_counter += job_count * REPEATS - - elif test == MRSE: - multiple_rules_single_event(job_count, REPEATS, job_counter, requested_jobs, runtime_start) - job_counter += job_count * REPEATS - - elif test == SRME: - single_rule_multiple_events(job_count, REPEATS, job_counter, requested_jobs, runtime_start) - job_counter += job_count * REPEATS - - elif test == SRSEP: - single_rule_single_event_parallel(job_count, REPEATS, job_counter, requested_jobs, runtime_start) - job_counter += job_count * REPEATS - - elif test == SRSES: - single_rule_single_event_sequential(job_count, REPEATS, job_counter, requested_jobs, runtime_start) - job_counter += job_count * REPEATS - - print(f"All tests completed in: {str(time.time()-runtime_start)}") - -def get_meow_graph(results_dir:str): - lines = [] - - for run_type in os.listdir(results_dir): - #if run_type == 'single_Pattern_single_file_sequential': - # continue - -# lines.append((f'scheduling {run_type}', [], 'solid')) - lines.append((run_type, [], 'solid')) - run_path = os.path.join(results_dir, run_type) - - for job_count in os.listdir(run_path): - results_path = os.path.join(run_path, job_count, "results.txt") - with open(results_path, 'r') as f_in: - data = f_in.readlines() - - scheduling_duration = 0 - for line in data: - if "Average schedule time: " in line: - scheduling_duration = float(line.replace( - "Average schedule time: ", '')) - - lines[-1][1].append((job_count, scheduling_duration)) - lines[-1][1].sort(key=lambda y: float(y[0])) - - return lines - -def make_plot(lines:List, graph_path:str, title:str, logged:bool): - w = 10 - h = 4 - linecount = 0 - columns = 1 - - pyplot.figure(figsize=(w, h)) - for l in range(len(lines)): - x_values = numpy.asarray([float(i[0]) for i in lines[l][1]]) - y_values = numpy.asarray([float(i[1]) for i in lines[l][1]]) - - # Remove this check to always display lines - if lines[l][2] == 'solid': - pyplot.plot(x_values, y_values, label=lines[l][0], linestyle=lines[l][2], marker=LINE_KEYS[lines[l][0]][0], color=LINE_KEYS[lines[l][0]][1]) - linecount += 1 - - columns = int(linecount/3) + 1 - - pyplot.xlabel("Number of jobs scheduled") - pyplot.ylabel("Time taken (seconds)") - pyplot.title(title) - - handles, labels = pyplot.gca().get_legend_handles_labels() - # legend_order = [2, 4, 0, 1, 3] - # pyplot.legend([handles[i] for i in legend_order], [labels[i] for i in legend_order]) - - pyplot.legend(ncol=columns, prop={'size': 12}) - if logged: - pyplot.yscale('log') - - x_ticks = [] - for tick in x_values: - label = int(tick) - if tick <= 100 and tick % 20 == 0: - label = f"\n{int(tick)}" - x_ticks.append(label) - - pyplot.xticks(x_values, x_ticks) - - pyplot.savefig(graph_path, format='pdf', bbox_inches='tight') - -def make_both_plots(lines, path, title, log=True): - make_plot(lines, path, title, False) - if log: - logged_path = path[:path.index(".pdf")] + "_logged" \ - + path[path.index(".pdf"):] - make_plot(lines, logged_path, title, True) - - -def make_graphs(): - lines = get_meow_graph(RESULTS_DIR) - - make_both_plots( - lines, - "result.pdf", - "MiG scheduling overheads on the Threadripper" - ) - - average_lines = [] - all_delta_lines = [] - no_spsfs_delta_lines = [] - for line_signature, line_values, lines_style in lines: - if lines_style == 'solid': - averages = [(i, v/float(i)) for i, v in line_values] - average_lines.append((line_signature, averages, lines_style)) - - if line_signature not in [ - "total single_Pattern_single_file_sequential", - "scheduling single_Pattern_single_file_sequential_jobs", - "SPSFS"]: - deltas = [] - for i in range(len(line_values)-1): - deltas.append((line_values[i+1][0], - (averages[i+1][1]-averages[i][1]) \ - / (float(averages[i+1][0])-float(averages[i][0])))) - no_spsfs_delta_lines.append((line_signature, deltas, lines_style)) - deltas = [] - for i in range(len(line_values)-1): - deltas.append((line_values[i+1][0], - (averages[i+1][1]-averages[i][1]) \ - / (float(averages[i+1][0])-float(averages[i][0])))) - all_delta_lines.append((line_signature, deltas, lines_style)) - - - make_both_plots( - average_lines, - "result_averaged.pdf", - "Per-job MiG scheduling overheads on the Threadripper" - ) - - make_both_plots( - all_delta_lines, - "result_deltas.pdf", - "Difference in per-job MiG scheduling overheads on the Threadripper", - log=False - ) - -if __name__ == '__main__': - try: - run_tests() - make_graphs() - rmtree(DEFAULT_JOB_QUEUE_DIR) - rmtree(DEFAULT_JOB_OUTPUT_DIR) - rmtree(BASE) - except KeyboardInterrupt as ki: - try: - sys.exit(1) - except SystemExit: - os._exit(1) diff --git a/benchmarking/sequential.ipynb b/benchmarking/sequential.ipynb deleted file mode 100644 index 7e02a29..0000000 --- a/benchmarking/sequential.ipynb +++ /dev/null @@ -1,69 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 22, - "metadata": {}, - "outputs": [], - "source": [ - "INPUT_FILE = 'file_0.txt'\n", - "MAX_COUNT = 100" - ] - }, - { - "cell_type": "code", - "execution_count": 23, - "metadata": {}, - "outputs": [], - "source": [ - "with open(INPUT_FILE, 'r') as f:\n", - " data = int(f.read())" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "print(\"read in: \"+ str(data))\n", - "print(\"writing out: \"+ str(data+1))" - ] - }, - { - "cell_type": "code", - "execution_count": 24, - "metadata": {}, - "outputs": [], - "source": [ - "if data+1 < MAX_COUNT:\n", - " with open(INPUT_FILE.replace(str(data), str(data+1)), 'w') as f:\n", - " f.write(str(data+1))" - ] - } - ], - "metadata": { - "interpreter": { - "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" - }, - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.5" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/benchmarking/shared.py b/benchmarking/shared.py deleted file mode 100644 index afbe9fe..0000000 --- a/benchmarking/shared.py +++ /dev/null @@ -1,242 +0,0 @@ - -import datetime -import io -import os -import pathlib -import time -import yaml - -from typing import Any, Dict, Tuple, List - -from meow_base.core.correctness.vars import DEFAULT_JOB_OUTPUT_DIR, \ - DEFAULT_JOB_QUEUE_DIR -from meow_base.core.base_pattern import BasePattern -from meow_base.core.base_recipe import BaseRecipe -from meow_base.core.runner import MeowRunner -from meow_base.patterns.file_event_pattern import WatchdogMonitor -from meow_base.recipes.jupyter_notebook_recipe import PapermillHandler -from meow_base.conductors import LocalPythonConductor -from meow_base.functionality.file_io import rmtree - -RESULTS_DIR = "results" -BASE = "benchmark_base" -REPEATS = 10 -JOBS_COUNTS = [ - 10, 20, 30, 40, 50, - 60, 70, 80, 90, 100, - 125, 150, 175, 200, - 250, 300, 400, 500 -] - -SRME = "single_rule_multiple_events" -MRSE = "multiple_rules_single_event" -SRSEP = "single_rule_single_event_parallel" -MRME = "multiple_rules_multiple_events" -SRSES = "single_rule_single_event_sequential" - -TESTS = [ - SRME, - MRSE, - SRSEP, - MRME, - # This test will take approx 90% of total time - SRSES -] - - -class DummyConductor(LocalPythonConductor): - def valid_execute_criteria(self, job:Dict[str,Any])->Tuple[bool,str]: - return False, ">:(" - - -def datetime_to_timestamp(date_time_obj:datetime): - return time.mktime(date_time_obj.timetuple()) \ - + float(date_time_obj.microsecond)/1000000 - -def generate(file_count:int, file_path:str, file_type:str=".txt"): - first_filename = '' - start = time.time() - for i in range(int(file_count)): - filename = file_path + str(i) + file_type - if not first_filename: - first_filename = filename - with open(filename, 'w') as f: - f.write('0') - return first_filename, time.time() - start - -def cleanup(jobs:List[str], file_out:str, base_time:float, gen_time:float, - execution:bool=False): - if not jobs: - return - - job_timestamps = [] - for job in jobs: - if execution: - with open(f"{DEFAULT_JOB_OUTPUT_DIR}/{job}/job.yml", 'r') as f_in: - data = yaml.load(f_in, Loader=yaml.Loader) - else: - with open(f"{DEFAULT_JOB_QUEUE_DIR}/{job}/job.yml", 'r') as f_in: - data = yaml.load(f_in, Loader=yaml.Loader) - create_datetime = data['create'] - create_timestamp = datetime_to_timestamp(create_datetime) - job_timestamps.append((create_timestamp, create_datetime)) - - job_timestamps.sort(key=lambda y: int(y[0])) - - first = job_timestamps[0] - last = job_timestamps[-1] - - #dt = datetime.datetime.fromtimestamp(os.path.getctime(base_time), datetime.timezone(datetime.timedelta(hours=0))) - dt = datetime.datetime.fromtimestamp(os.path.getctime(base_time)) - -# if execution: -# queue_times = [] -# execution_times = [] -# for j in jobs: -# mrsl_dict = load(os.path.join(mrsl_dir, j))# -# -# queue_times.append(time.mktime(mrsl_dict['EXECUTING_TIMESTAMP']) - time.mktime(mrsl_dict['QUEUED_TIMESTAMP'])) -# execution_times.append(time.mktime(mrsl_dict['FINISHED_TIMESTAMP']) - time.mktime(mrsl_dict['EXECUTING_TIMESTAMP'])) - pathlib.Path(os.path.dirname(file_out)).mkdir(parents=True, exist_ok=True) - with open(file_out, 'w') as f_out: - f_out.write("Job count: "+ str(len(jobs)) +"\n") - f_out.write("Generation time: "+ str(round(gen_time, 5)) +"\n") - f_out.write("First trigger: "+ str(dt) +"\n") - f_out.write("First scheduling datetime: "+ str(first[1]) +"\n") - f_out.write("Last scheduling datetime: "+ str(last[1]) +"\n") - f_out.write("First scheduling unixtime: "+ str(first[0]) +"\n") - f_out.write("First scheduling unixtime: "+ str(last[0]) +"\n") - f_out.write("Scheduling difference (seconds): "+ str(round(last[0] - first[0], 3)) +"\n") - f_out.write("Initial scheduling delay (seconds): "+ str(round(first[0] - os.path.getctime(base_time), 3)) +"\n") - total_time = round(last[0] - os.path.getctime(base_time), 3) - f_out.write("Total scheduling delay (seconds): "+ str(total_time) +"\n") - -# if execution: -# f_out.write("Average execution time (seconds): "+ str(round(mean(execution_times), 3)) +"\n") -# f_out.write("Max execution time (seconds): "+ str(round(max(execution_times), 3)) +"\n") -# f_out.write("Min execution time (seconds): "+ str(round(min(execution_times), 3)) +"\n") - -# f_out.write("Average queueing delay (seconds): "+ str(round(mean(queue_times), 3)) +"\n") -# f_out.write("Max queueing delay (seconds): "+ str(round(max(queue_times), 3)) +"\n") -# f_out.write("Min queueing delay (seconds): "+ str(round(min(queue_times), 3)) +"\n") - -# queue_times.remove(max(queue_times)) -# f_out.write("Average excluded queueing delay (seconds): "+ str(round(mean(queue_times), 3)) +"\n") - - return total_time - -def mean(l:List): - return sum(l)/len(l) - -def collate_results(base_dir:str): - scheduling_delays = [] - - for run in os.listdir(base_dir): - if run != 'results.txt': - with open(os.path.join(base_dir, run, "results.txt"), 'r') as f: - d = f.readlines() - - for l in d: - if "Total scheduling delay (seconds): " in l: - scheduling_delays.append(float(l.replace( - "Total scheduling delay (seconds): ", ''))) - - with open(os.path.join(base_dir, 'results.txt'), 'w') as f: - f.write(f"Average schedule time: {round(mean(scheduling_delays), 3)}\n") - f.write(f"Scheduling times: {scheduling_delays}") - -def run_test(patterns:Dict[str,BasePattern], recipes:Dict[str,BaseRecipe], - files_count:int, expected_job_count:int, repeats:int, job_counter:int, - requested_jobs:int, runtime_start:float, signature:str="", - execution:bool=False, print_logging:bool=False): - if not os.path.exists(RESULTS_DIR): - os.mkdir(RESULTS_DIR) - - # Does not work. left here as reminder - if execution: - os.system("export LC_ALL=C.UTF-8") - os.system("export LANG=C.UTF-8") - - for run in range(repeats): - # Ensure complete cleanup from previous run - for f in [BASE, DEFAULT_JOB_QUEUE_DIR, DEFAULT_JOB_OUTPUT_DIR]: - if os.path.exists(f): - rmtree(f) - - file_base = os.path.join(BASE, 'testing') - pathlib.Path(file_base).mkdir(parents=True, exist_ok=True) - - runner_debug_stream = io.StringIO("") - - if execution: - runner = MeowRunner( - WatchdogMonitor(BASE, patterns, recipes, settletime=1), - PapermillHandler(), - LocalPythonConductor(), - print=runner_debug_stream, - logging=3 - ) - else: - runner = MeowRunner( - WatchdogMonitor(BASE, patterns, recipes, settletime=1), - PapermillHandler(), - DummyConductor(), - print=runner_debug_stream, - logging=3 - ) - - runner.start() - - # Generate triggering files - first_filename, generation_duration = \ - generate(files_count, file_base +"/file_") - - idle_loops = 0 - total_loops = 0 - messages = 0 - total_time = expected_job_count * 3 - if execution: - total_time = expected_job_count * 5 - while idle_loops < 10 and total_loops < total_time: - time.sleep(1) - runner_debug_stream.seek(0) - new_messages = len(runner_debug_stream.readlines()) - - if messages == new_messages: - idle_loops += 1 - else: - idle_loops = 0 - messages = new_messages - total_loops += 1 - - runner.stop() - - if execution: - jobs = os.listdir(DEFAULT_JOB_OUTPUT_DIR) - else: - jobs = os.listdir(DEFAULT_JOB_QUEUE_DIR) - - results_path = os.path.join( - RESULTS_DIR, - signature, - str(expected_job_count), - str(run), - "results.txt" - ) - - cleanup( - jobs, - results_path, - first_filename, - generation_duration, - execution=execution - ) - - print(f"Completed scheduling run {str(run + 1)} of {str(len(jobs))}" - f"/{str(expected_job_count)} jobs for '{signature}' " - f"{job_counter + expected_job_count*(run+1)}/{requested_jobs} " - f"({str(round(time.time()-runtime_start, 3))}s)") - - collate_results( - os.path.join(RESULTS_DIR, signature, str(expected_job_count)) - ) diff --git a/benchmarking/srme.py b/benchmarking/srme.py deleted file mode 100644 index 2309260..0000000 --- a/benchmarking/srme.py +++ /dev/null @@ -1,34 +0,0 @@ - -from meow_base.patterns import FileEventPattern -from meow_base.recipes import get_recipe_from_notebook - -from shared import run_test, SRME - -def single_rule_multiple_events(job_count:int, REPEATS:int, job_counter:int, - requested_jobs:int, runtime_start:float): - patterns = {} - pattern = FileEventPattern( - f"pattern_one", - f"testing/*", - "recipe_one", - "input" - ) - patterns[pattern.name] = pattern - - recipe = get_recipe_from_notebook("recipe_one", "test.ipynb") - - recipes = { - recipe.name: recipe - } - - run_test( - patterns, - recipes, - job_count, - job_count, - REPEATS, - job_counter, - requested_jobs, - runtime_start, - signature=SRME - ) diff --git a/benchmarking/srsep.py b/benchmarking/srsep.py deleted file mode 100644 index 77a17a9..0000000 --- a/benchmarking/srsep.py +++ /dev/null @@ -1,36 +0,0 @@ - -from meow_base.patterns import FileEventPattern -from meow_base.recipes import get_recipe_from_notebook - -from meow_base.functionality.meow import create_parameter_sweep -from shared import run_test, SRSEP - -def single_rule_single_event_parallel(job_count:int, REPEATS:int, - job_counter:int, requested_jobs:int, runtime_start:float): - patterns = {} - pattern = FileEventPattern( - f"pattern_one", - f"testing/*", - "recipe_one", - "input", - sweep=create_parameter_sweep("var", 1, job_count, 1) - ) - patterns[pattern.name] = pattern - - recipe = get_recipe_from_notebook("recipe_one", "test.ipynb") - - recipes = { - recipe.name: recipe - } - - run_test( - patterns, - recipes, - 1, - job_count, - REPEATS, - job_counter, - requested_jobs, - runtime_start, - signature=SRSEP - ) \ No newline at end of file diff --git a/benchmarking/srsps.py b/benchmarking/srsps.py deleted file mode 100644 index 5ff4e72..0000000 --- a/benchmarking/srsps.py +++ /dev/null @@ -1,39 +0,0 @@ - -from meow_base.patterns import FileEventPattern -from meow_base.recipes import get_recipe_from_notebook - -from shared import run_test, SRSES - -def single_rule_single_event_sequential(job_count:int, REPEATS:int, - job_counter:int, requested_jobs:int, runtime_start:float): - patterns = {} - pattern = FileEventPattern( - f"pattern_one", - f"testing/*", - "recipe_two", - "INPUT_FILE", - parameters={ - "MAX_COUNT":job_count - } - ) - patterns[pattern.name] = pattern - - recipe = get_recipe_from_notebook("recipe_two", "sequential.ipynb") - - recipes = { - recipe.name: recipe - } - - run_test( - patterns, - recipes, - 1, - job_count, - REPEATS, - job_counter, - requested_jobs, - runtime_start, - signature=SRSES, - execution=True, - print_logging=False - ) diff --git a/benchmarking/test.ipynb b/benchmarking/test.ipynb deleted file mode 100644 index a7abefa..0000000 --- a/benchmarking/test.ipynb +++ /dev/null @@ -1,47 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "ename": "", - "evalue": "", - "output_type": "error", - "traceback": [ - "\u001b[1;31mFailed to start the Kernel. \n", - "Failed to start the Kernel 'Python 3.6.9 64-bit'. \n", - "View Jupyter log for further details. Kernel has not been started" - ] - } - ], - "source": [ - "print('this is some outpug ')" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "interpreter": { - "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" - }, - "kernelspec": { - "display_name": "Python 3.6.9 64-bit", - "language": "python", - "name": "python3" - }, - "language_info": { - "name": "python", - "version": "3.6.9" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/patterns/file_event_pattern.py b/patterns/file_event_pattern.py index 21f0899..32b4716 100644 --- a/patterns/file_event_pattern.py +++ b/patterns/file_event_pattern.py @@ -559,6 +559,14 @@ class WatchdogEventHandler(PatternMatchingEventHandler): else: self._recent_jobs[event.src_path] = \ [event.time_stamp, {event.event_type}] + + # If we have a closed event then short-cut the wait and send event + # immediately + if event.event_type == FILE_CLOSED_EVENT: + self.monitor.match(event) + self._recent_jobs_lock.release() + return + except Exception as ex: self._recent_jobs_lock.release() raise Exception(ex)