moved benchmarking to its own meow_benchmarks package
This commit is contained in:
@ -1,35 +0,0 @@
|
|||||||
|
|
||||||
from meow_base.patterns import FileEventPattern
|
|
||||||
from meow_base.recipes import get_recipe_from_notebook
|
|
||||||
|
|
||||||
from shared import run_test, MRME
|
|
||||||
|
|
||||||
def multiple_rules_multiple_events(job_count:int, REPEATS:int, job_counter:int,
|
|
||||||
requested_jobs:int, runtime_start:float):
|
|
||||||
patterns = {}
|
|
||||||
for i in range(job_count):
|
|
||||||
pattern = FileEventPattern(
|
|
||||||
f"pattern_{i}",
|
|
||||||
f"testing/file_{i}.txt",
|
|
||||||
"recipe_one",
|
|
||||||
"input"
|
|
||||||
)
|
|
||||||
patterns[pattern.name] = pattern
|
|
||||||
|
|
||||||
recipe = get_recipe_from_notebook("recipe_one", "test.ipynb")
|
|
||||||
|
|
||||||
recipes = {
|
|
||||||
recipe.name: recipe
|
|
||||||
}
|
|
||||||
|
|
||||||
run_test(
|
|
||||||
patterns,
|
|
||||||
recipes,
|
|
||||||
job_count,
|
|
||||||
job_count,
|
|
||||||
REPEATS,
|
|
||||||
job_counter,
|
|
||||||
requested_jobs,
|
|
||||||
runtime_start,
|
|
||||||
signature=MRME
|
|
||||||
)
|
|
@ -1,35 +0,0 @@
|
|||||||
|
|
||||||
from meow_base.patterns import FileEventPattern
|
|
||||||
from meow_base.recipes import get_recipe_from_notebook
|
|
||||||
|
|
||||||
from shared import run_test, MRSE
|
|
||||||
|
|
||||||
def multiple_rules_single_event(job_count:int, REPEATS:int, job_counter:int,
|
|
||||||
requested_jobs:int, runtime_start:float):
|
|
||||||
patterns = {}
|
|
||||||
for i in range(job_count):
|
|
||||||
pattern = FileEventPattern(
|
|
||||||
f"pattern_{i}",
|
|
||||||
f"testing/*",
|
|
||||||
"recipe_one",
|
|
||||||
"input"
|
|
||||||
)
|
|
||||||
patterns[pattern.name] = pattern
|
|
||||||
|
|
||||||
recipe = get_recipe_from_notebook("recipe_one", "test.ipynb")
|
|
||||||
|
|
||||||
recipes = {
|
|
||||||
recipe.name: recipe
|
|
||||||
}
|
|
||||||
|
|
||||||
run_test(
|
|
||||||
patterns,
|
|
||||||
recipes,
|
|
||||||
1,
|
|
||||||
job_count,
|
|
||||||
REPEATS,
|
|
||||||
job_counter,
|
|
||||||
requested_jobs,
|
|
||||||
runtime_start,
|
|
||||||
signature=MRSE
|
|
||||||
)
|
|
@ -1,201 +0,0 @@
|
|||||||
|
|
||||||
import matplotlib.pyplot as pyplot
|
|
||||||
import numpy
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import os
|
|
||||||
|
|
||||||
from typing import List
|
|
||||||
|
|
||||||
from shared import JOBS_COUNTS, REPEATS, TESTS, MRME, MRSE, SRME, SRSEP, \
|
|
||||||
SRSES, RESULTS_DIR, BASE
|
|
||||||
from mrme import multiple_rules_multiple_events
|
|
||||||
from mrse import multiple_rules_single_event
|
|
||||||
from srme import single_rule_multiple_events
|
|
||||||
from srsep import single_rule_single_event_parallel
|
|
||||||
from srsps import single_rule_single_event_sequential
|
|
||||||
|
|
||||||
from meow_base.core.correctness.vars import DEFAULT_JOB_OUTPUT_DIR, \
|
|
||||||
DEFAULT_JOB_QUEUE_DIR
|
|
||||||
from meow_base.functionality.file_io import rmtree
|
|
||||||
|
|
||||||
LINE_KEYS = {
|
|
||||||
SRSES: ('x','#a1467e'),
|
|
||||||
SRME: ('.','#896cff'),
|
|
||||||
MRME: ('d','#5983b0'),
|
|
||||||
MRSE: ('P','#ff6cbe'),
|
|
||||||
SRSEP: ('*','#3faf46'),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def run_tests():
|
|
||||||
rmtree(RESULTS_DIR)
|
|
||||||
|
|
||||||
requested_jobs=0
|
|
||||||
for job_count in JOBS_COUNTS:
|
|
||||||
requested_jobs += job_count * REPEATS * len(TESTS)
|
|
||||||
print(f"requested_jobs: {requested_jobs}")
|
|
||||||
|
|
||||||
runtime_start=time.time()
|
|
||||||
|
|
||||||
job_counter=0
|
|
||||||
for job_count in JOBS_COUNTS:
|
|
||||||
for test in TESTS:
|
|
||||||
if test == MRME:
|
|
||||||
multiple_rules_multiple_events(job_count, REPEATS, job_counter, requested_jobs, runtime_start)
|
|
||||||
job_counter += job_count * REPEATS
|
|
||||||
|
|
||||||
elif test == MRSE:
|
|
||||||
multiple_rules_single_event(job_count, REPEATS, job_counter, requested_jobs, runtime_start)
|
|
||||||
job_counter += job_count * REPEATS
|
|
||||||
|
|
||||||
elif test == SRME:
|
|
||||||
single_rule_multiple_events(job_count, REPEATS, job_counter, requested_jobs, runtime_start)
|
|
||||||
job_counter += job_count * REPEATS
|
|
||||||
|
|
||||||
elif test == SRSEP:
|
|
||||||
single_rule_single_event_parallel(job_count, REPEATS, job_counter, requested_jobs, runtime_start)
|
|
||||||
job_counter += job_count * REPEATS
|
|
||||||
|
|
||||||
elif test == SRSES:
|
|
||||||
single_rule_single_event_sequential(job_count, REPEATS, job_counter, requested_jobs, runtime_start)
|
|
||||||
job_counter += job_count * REPEATS
|
|
||||||
|
|
||||||
print(f"All tests completed in: {str(time.time()-runtime_start)}")
|
|
||||||
|
|
||||||
def get_meow_graph(results_dir:str):
|
|
||||||
lines = []
|
|
||||||
|
|
||||||
for run_type in os.listdir(results_dir):
|
|
||||||
#if run_type == 'single_Pattern_single_file_sequential':
|
|
||||||
# continue
|
|
||||||
|
|
||||||
# lines.append((f'scheduling {run_type}', [], 'solid'))
|
|
||||||
lines.append((run_type, [], 'solid'))
|
|
||||||
run_path = os.path.join(results_dir, run_type)
|
|
||||||
|
|
||||||
for job_count in os.listdir(run_path):
|
|
||||||
results_path = os.path.join(run_path, job_count, "results.txt")
|
|
||||||
with open(results_path, 'r') as f_in:
|
|
||||||
data = f_in.readlines()
|
|
||||||
|
|
||||||
scheduling_duration = 0
|
|
||||||
for line in data:
|
|
||||||
if "Average schedule time: " in line:
|
|
||||||
scheduling_duration = float(line.replace(
|
|
||||||
"Average schedule time: ", ''))
|
|
||||||
|
|
||||||
lines[-1][1].append((job_count, scheduling_duration))
|
|
||||||
lines[-1][1].sort(key=lambda y: float(y[0]))
|
|
||||||
|
|
||||||
return lines
|
|
||||||
|
|
||||||
def make_plot(lines:List, graph_path:str, title:str, logged:bool):
|
|
||||||
w = 10
|
|
||||||
h = 4
|
|
||||||
linecount = 0
|
|
||||||
columns = 1
|
|
||||||
|
|
||||||
pyplot.figure(figsize=(w, h))
|
|
||||||
for l in range(len(lines)):
|
|
||||||
x_values = numpy.asarray([float(i[0]) for i in lines[l][1]])
|
|
||||||
y_values = numpy.asarray([float(i[1]) for i in lines[l][1]])
|
|
||||||
|
|
||||||
# Remove this check to always display lines
|
|
||||||
if lines[l][2] == 'solid':
|
|
||||||
pyplot.plot(x_values, y_values, label=lines[l][0], linestyle=lines[l][2], marker=LINE_KEYS[lines[l][0]][0], color=LINE_KEYS[lines[l][0]][1])
|
|
||||||
linecount += 1
|
|
||||||
|
|
||||||
columns = int(linecount/3) + 1
|
|
||||||
|
|
||||||
pyplot.xlabel("Number of jobs scheduled")
|
|
||||||
pyplot.ylabel("Time taken (seconds)")
|
|
||||||
pyplot.title(title)
|
|
||||||
|
|
||||||
handles, labels = pyplot.gca().get_legend_handles_labels()
|
|
||||||
# legend_order = [2, 4, 0, 1, 3]
|
|
||||||
# pyplot.legend([handles[i] for i in legend_order], [labels[i] for i in legend_order])
|
|
||||||
|
|
||||||
pyplot.legend(ncol=columns, prop={'size': 12})
|
|
||||||
if logged:
|
|
||||||
pyplot.yscale('log')
|
|
||||||
|
|
||||||
x_ticks = []
|
|
||||||
for tick in x_values:
|
|
||||||
label = int(tick)
|
|
||||||
if tick <= 100 and tick % 20 == 0:
|
|
||||||
label = f"\n{int(tick)}"
|
|
||||||
x_ticks.append(label)
|
|
||||||
|
|
||||||
pyplot.xticks(x_values, x_ticks)
|
|
||||||
|
|
||||||
pyplot.savefig(graph_path, format='pdf', bbox_inches='tight')
|
|
||||||
|
|
||||||
def make_both_plots(lines, path, title, log=True):
|
|
||||||
make_plot(lines, path, title, False)
|
|
||||||
if log:
|
|
||||||
logged_path = path[:path.index(".pdf")] + "_logged" \
|
|
||||||
+ path[path.index(".pdf"):]
|
|
||||||
make_plot(lines, logged_path, title, True)
|
|
||||||
|
|
||||||
|
|
||||||
def make_graphs():
|
|
||||||
lines = get_meow_graph(RESULTS_DIR)
|
|
||||||
|
|
||||||
make_both_plots(
|
|
||||||
lines,
|
|
||||||
"result.pdf",
|
|
||||||
"MiG scheduling overheads on the Threadripper"
|
|
||||||
)
|
|
||||||
|
|
||||||
average_lines = []
|
|
||||||
all_delta_lines = []
|
|
||||||
no_spsfs_delta_lines = []
|
|
||||||
for line_signature, line_values, lines_style in lines:
|
|
||||||
if lines_style == 'solid':
|
|
||||||
averages = [(i, v/float(i)) for i, v in line_values]
|
|
||||||
average_lines.append((line_signature, averages, lines_style))
|
|
||||||
|
|
||||||
if line_signature not in [
|
|
||||||
"total single_Pattern_single_file_sequential",
|
|
||||||
"scheduling single_Pattern_single_file_sequential_jobs",
|
|
||||||
"SPSFS"]:
|
|
||||||
deltas = []
|
|
||||||
for i in range(len(line_values)-1):
|
|
||||||
deltas.append((line_values[i+1][0],
|
|
||||||
(averages[i+1][1]-averages[i][1]) \
|
|
||||||
/ (float(averages[i+1][0])-float(averages[i][0]))))
|
|
||||||
no_spsfs_delta_lines.append((line_signature, deltas, lines_style))
|
|
||||||
deltas = []
|
|
||||||
for i in range(len(line_values)-1):
|
|
||||||
deltas.append((line_values[i+1][0],
|
|
||||||
(averages[i+1][1]-averages[i][1]) \
|
|
||||||
/ (float(averages[i+1][0])-float(averages[i][0]))))
|
|
||||||
all_delta_lines.append((line_signature, deltas, lines_style))
|
|
||||||
|
|
||||||
|
|
||||||
make_both_plots(
|
|
||||||
average_lines,
|
|
||||||
"result_averaged.pdf",
|
|
||||||
"Per-job MiG scheduling overheads on the Threadripper"
|
|
||||||
)
|
|
||||||
|
|
||||||
make_both_plots(
|
|
||||||
all_delta_lines,
|
|
||||||
"result_deltas.pdf",
|
|
||||||
"Difference in per-job MiG scheduling overheads on the Threadripper",
|
|
||||||
log=False
|
|
||||||
)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
try:
|
|
||||||
run_tests()
|
|
||||||
make_graphs()
|
|
||||||
rmtree(DEFAULT_JOB_QUEUE_DIR)
|
|
||||||
rmtree(DEFAULT_JOB_OUTPUT_DIR)
|
|
||||||
rmtree(BASE)
|
|
||||||
except KeyboardInterrupt as ki:
|
|
||||||
try:
|
|
||||||
sys.exit(1)
|
|
||||||
except SystemExit:
|
|
||||||
os._exit(1)
|
|
@ -1,69 +0,0 @@
|
|||||||
{
|
|
||||||
"cells": [
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 22,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"INPUT_FILE = 'file_0.txt'\n",
|
|
||||||
"MAX_COUNT = 100"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 23,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"with open(INPUT_FILE, 'r') as f:\n",
|
|
||||||
" data = int(f.read())"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"print(\"read in: \"+ str(data))\n",
|
|
||||||
"print(\"writing out: \"+ str(data+1))"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 24,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"if data+1 < MAX_COUNT:\n",
|
|
||||||
" with open(INPUT_FILE.replace(str(data), str(data+1)), 'w') as f:\n",
|
|
||||||
" f.write(str(data+1))"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"metadata": {
|
|
||||||
"interpreter": {
|
|
||||||
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
|
|
||||||
},
|
|
||||||
"kernelspec": {
|
|
||||||
"display_name": "Python 3",
|
|
||||||
"language": "python",
|
|
||||||
"name": "python3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"codemirror_mode": {
|
|
||||||
"name": "ipython",
|
|
||||||
"version": 3
|
|
||||||
},
|
|
||||||
"file_extension": ".py",
|
|
||||||
"mimetype": "text/x-python",
|
|
||||||
"name": "python",
|
|
||||||
"nbconvert_exporter": "python",
|
|
||||||
"pygments_lexer": "ipython3",
|
|
||||||
"version": "3.9.5"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 4
|
|
||||||
}
|
|
@ -1,242 +0,0 @@
|
|||||||
|
|
||||||
import datetime
|
|
||||||
import io
|
|
||||||
import os
|
|
||||||
import pathlib
|
|
||||||
import time
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
from typing import Any, Dict, Tuple, List
|
|
||||||
|
|
||||||
from meow_base.core.correctness.vars import DEFAULT_JOB_OUTPUT_DIR, \
|
|
||||||
DEFAULT_JOB_QUEUE_DIR
|
|
||||||
from meow_base.core.base_pattern import BasePattern
|
|
||||||
from meow_base.core.base_recipe import BaseRecipe
|
|
||||||
from meow_base.core.runner import MeowRunner
|
|
||||||
from meow_base.patterns.file_event_pattern import WatchdogMonitor
|
|
||||||
from meow_base.recipes.jupyter_notebook_recipe import PapermillHandler
|
|
||||||
from meow_base.conductors import LocalPythonConductor
|
|
||||||
from meow_base.functionality.file_io import rmtree
|
|
||||||
|
|
||||||
RESULTS_DIR = "results"
|
|
||||||
BASE = "benchmark_base"
|
|
||||||
REPEATS = 10
|
|
||||||
JOBS_COUNTS = [
|
|
||||||
10, 20, 30, 40, 50,
|
|
||||||
60, 70, 80, 90, 100,
|
|
||||||
125, 150, 175, 200,
|
|
||||||
250, 300, 400, 500
|
|
||||||
]
|
|
||||||
|
|
||||||
SRME = "single_rule_multiple_events"
|
|
||||||
MRSE = "multiple_rules_single_event"
|
|
||||||
SRSEP = "single_rule_single_event_parallel"
|
|
||||||
MRME = "multiple_rules_multiple_events"
|
|
||||||
SRSES = "single_rule_single_event_sequential"
|
|
||||||
|
|
||||||
TESTS = [
|
|
||||||
SRME,
|
|
||||||
MRSE,
|
|
||||||
SRSEP,
|
|
||||||
MRME,
|
|
||||||
# This test will take approx 90% of total time
|
|
||||||
SRSES
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class DummyConductor(LocalPythonConductor):
|
|
||||||
def valid_execute_criteria(self, job:Dict[str,Any])->Tuple[bool,str]:
|
|
||||||
return False, ">:("
|
|
||||||
|
|
||||||
|
|
||||||
def datetime_to_timestamp(date_time_obj:datetime):
|
|
||||||
return time.mktime(date_time_obj.timetuple()) \
|
|
||||||
+ float(date_time_obj.microsecond)/1000000
|
|
||||||
|
|
||||||
def generate(file_count:int, file_path:str, file_type:str=".txt"):
|
|
||||||
first_filename = ''
|
|
||||||
start = time.time()
|
|
||||||
for i in range(int(file_count)):
|
|
||||||
filename = file_path + str(i) + file_type
|
|
||||||
if not first_filename:
|
|
||||||
first_filename = filename
|
|
||||||
with open(filename, 'w') as f:
|
|
||||||
f.write('0')
|
|
||||||
return first_filename, time.time() - start
|
|
||||||
|
|
||||||
def cleanup(jobs:List[str], file_out:str, base_time:float, gen_time:float,
|
|
||||||
execution:bool=False):
|
|
||||||
if not jobs:
|
|
||||||
return
|
|
||||||
|
|
||||||
job_timestamps = []
|
|
||||||
for job in jobs:
|
|
||||||
if execution:
|
|
||||||
with open(f"{DEFAULT_JOB_OUTPUT_DIR}/{job}/job.yml", 'r') as f_in:
|
|
||||||
data = yaml.load(f_in, Loader=yaml.Loader)
|
|
||||||
else:
|
|
||||||
with open(f"{DEFAULT_JOB_QUEUE_DIR}/{job}/job.yml", 'r') as f_in:
|
|
||||||
data = yaml.load(f_in, Loader=yaml.Loader)
|
|
||||||
create_datetime = data['create']
|
|
||||||
create_timestamp = datetime_to_timestamp(create_datetime)
|
|
||||||
job_timestamps.append((create_timestamp, create_datetime))
|
|
||||||
|
|
||||||
job_timestamps.sort(key=lambda y: int(y[0]))
|
|
||||||
|
|
||||||
first = job_timestamps[0]
|
|
||||||
last = job_timestamps[-1]
|
|
||||||
|
|
||||||
#dt = datetime.datetime.fromtimestamp(os.path.getctime(base_time), datetime.timezone(datetime.timedelta(hours=0)))
|
|
||||||
dt = datetime.datetime.fromtimestamp(os.path.getctime(base_time))
|
|
||||||
|
|
||||||
# if execution:
|
|
||||||
# queue_times = []
|
|
||||||
# execution_times = []
|
|
||||||
# for j in jobs:
|
|
||||||
# mrsl_dict = load(os.path.join(mrsl_dir, j))#
|
|
||||||
#
|
|
||||||
# queue_times.append(time.mktime(mrsl_dict['EXECUTING_TIMESTAMP']) - time.mktime(mrsl_dict['QUEUED_TIMESTAMP']))
|
|
||||||
# execution_times.append(time.mktime(mrsl_dict['FINISHED_TIMESTAMP']) - time.mktime(mrsl_dict['EXECUTING_TIMESTAMP']))
|
|
||||||
pathlib.Path(os.path.dirname(file_out)).mkdir(parents=True, exist_ok=True)
|
|
||||||
with open(file_out, 'w') as f_out:
|
|
||||||
f_out.write("Job count: "+ str(len(jobs)) +"\n")
|
|
||||||
f_out.write("Generation time: "+ str(round(gen_time, 5)) +"\n")
|
|
||||||
f_out.write("First trigger: "+ str(dt) +"\n")
|
|
||||||
f_out.write("First scheduling datetime: "+ str(first[1]) +"\n")
|
|
||||||
f_out.write("Last scheduling datetime: "+ str(last[1]) +"\n")
|
|
||||||
f_out.write("First scheduling unixtime: "+ str(first[0]) +"\n")
|
|
||||||
f_out.write("First scheduling unixtime: "+ str(last[0]) +"\n")
|
|
||||||
f_out.write("Scheduling difference (seconds): "+ str(round(last[0] - first[0], 3)) +"\n")
|
|
||||||
f_out.write("Initial scheduling delay (seconds): "+ str(round(first[0] - os.path.getctime(base_time), 3)) +"\n")
|
|
||||||
total_time = round(last[0] - os.path.getctime(base_time), 3)
|
|
||||||
f_out.write("Total scheduling delay (seconds): "+ str(total_time) +"\n")
|
|
||||||
|
|
||||||
# if execution:
|
|
||||||
# f_out.write("Average execution time (seconds): "+ str(round(mean(execution_times), 3)) +"\n")
|
|
||||||
# f_out.write("Max execution time (seconds): "+ str(round(max(execution_times), 3)) +"\n")
|
|
||||||
# f_out.write("Min execution time (seconds): "+ str(round(min(execution_times), 3)) +"\n")
|
|
||||||
|
|
||||||
# f_out.write("Average queueing delay (seconds): "+ str(round(mean(queue_times), 3)) +"\n")
|
|
||||||
# f_out.write("Max queueing delay (seconds): "+ str(round(max(queue_times), 3)) +"\n")
|
|
||||||
# f_out.write("Min queueing delay (seconds): "+ str(round(min(queue_times), 3)) +"\n")
|
|
||||||
|
|
||||||
# queue_times.remove(max(queue_times))
|
|
||||||
# f_out.write("Average excluded queueing delay (seconds): "+ str(round(mean(queue_times), 3)) +"\n")
|
|
||||||
|
|
||||||
return total_time
|
|
||||||
|
|
||||||
def mean(l:List):
|
|
||||||
return sum(l)/len(l)
|
|
||||||
|
|
||||||
def collate_results(base_dir:str):
|
|
||||||
scheduling_delays = []
|
|
||||||
|
|
||||||
for run in os.listdir(base_dir):
|
|
||||||
if run != 'results.txt':
|
|
||||||
with open(os.path.join(base_dir, run, "results.txt"), 'r') as f:
|
|
||||||
d = f.readlines()
|
|
||||||
|
|
||||||
for l in d:
|
|
||||||
if "Total scheduling delay (seconds): " in l:
|
|
||||||
scheduling_delays.append(float(l.replace(
|
|
||||||
"Total scheduling delay (seconds): ", '')))
|
|
||||||
|
|
||||||
with open(os.path.join(base_dir, 'results.txt'), 'w') as f:
|
|
||||||
f.write(f"Average schedule time: {round(mean(scheduling_delays), 3)}\n")
|
|
||||||
f.write(f"Scheduling times: {scheduling_delays}")
|
|
||||||
|
|
||||||
def run_test(patterns:Dict[str,BasePattern], recipes:Dict[str,BaseRecipe],
|
|
||||||
files_count:int, expected_job_count:int, repeats:int, job_counter:int,
|
|
||||||
requested_jobs:int, runtime_start:float, signature:str="",
|
|
||||||
execution:bool=False, print_logging:bool=False):
|
|
||||||
if not os.path.exists(RESULTS_DIR):
|
|
||||||
os.mkdir(RESULTS_DIR)
|
|
||||||
|
|
||||||
# Does not work. left here as reminder
|
|
||||||
if execution:
|
|
||||||
os.system("export LC_ALL=C.UTF-8")
|
|
||||||
os.system("export LANG=C.UTF-8")
|
|
||||||
|
|
||||||
for run in range(repeats):
|
|
||||||
# Ensure complete cleanup from previous run
|
|
||||||
for f in [BASE, DEFAULT_JOB_QUEUE_DIR, DEFAULT_JOB_OUTPUT_DIR]:
|
|
||||||
if os.path.exists(f):
|
|
||||||
rmtree(f)
|
|
||||||
|
|
||||||
file_base = os.path.join(BASE, 'testing')
|
|
||||||
pathlib.Path(file_base).mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
runner_debug_stream = io.StringIO("")
|
|
||||||
|
|
||||||
if execution:
|
|
||||||
runner = MeowRunner(
|
|
||||||
WatchdogMonitor(BASE, patterns, recipes, settletime=1),
|
|
||||||
PapermillHandler(),
|
|
||||||
LocalPythonConductor(),
|
|
||||||
print=runner_debug_stream,
|
|
||||||
logging=3
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
runner = MeowRunner(
|
|
||||||
WatchdogMonitor(BASE, patterns, recipes, settletime=1),
|
|
||||||
PapermillHandler(),
|
|
||||||
DummyConductor(),
|
|
||||||
print=runner_debug_stream,
|
|
||||||
logging=3
|
|
||||||
)
|
|
||||||
|
|
||||||
runner.start()
|
|
||||||
|
|
||||||
# Generate triggering files
|
|
||||||
first_filename, generation_duration = \
|
|
||||||
generate(files_count, file_base +"/file_")
|
|
||||||
|
|
||||||
idle_loops = 0
|
|
||||||
total_loops = 0
|
|
||||||
messages = 0
|
|
||||||
total_time = expected_job_count * 3
|
|
||||||
if execution:
|
|
||||||
total_time = expected_job_count * 5
|
|
||||||
while idle_loops < 10 and total_loops < total_time:
|
|
||||||
time.sleep(1)
|
|
||||||
runner_debug_stream.seek(0)
|
|
||||||
new_messages = len(runner_debug_stream.readlines())
|
|
||||||
|
|
||||||
if messages == new_messages:
|
|
||||||
idle_loops += 1
|
|
||||||
else:
|
|
||||||
idle_loops = 0
|
|
||||||
messages = new_messages
|
|
||||||
total_loops += 1
|
|
||||||
|
|
||||||
runner.stop()
|
|
||||||
|
|
||||||
if execution:
|
|
||||||
jobs = os.listdir(DEFAULT_JOB_OUTPUT_DIR)
|
|
||||||
else:
|
|
||||||
jobs = os.listdir(DEFAULT_JOB_QUEUE_DIR)
|
|
||||||
|
|
||||||
results_path = os.path.join(
|
|
||||||
RESULTS_DIR,
|
|
||||||
signature,
|
|
||||||
str(expected_job_count),
|
|
||||||
str(run),
|
|
||||||
"results.txt"
|
|
||||||
)
|
|
||||||
|
|
||||||
cleanup(
|
|
||||||
jobs,
|
|
||||||
results_path,
|
|
||||||
first_filename,
|
|
||||||
generation_duration,
|
|
||||||
execution=execution
|
|
||||||
)
|
|
||||||
|
|
||||||
print(f"Completed scheduling run {str(run + 1)} of {str(len(jobs))}"
|
|
||||||
f"/{str(expected_job_count)} jobs for '{signature}' "
|
|
||||||
f"{job_counter + expected_job_count*(run+1)}/{requested_jobs} "
|
|
||||||
f"({str(round(time.time()-runtime_start, 3))}s)")
|
|
||||||
|
|
||||||
collate_results(
|
|
||||||
os.path.join(RESULTS_DIR, signature, str(expected_job_count))
|
|
||||||
)
|
|
@ -1,34 +0,0 @@
|
|||||||
|
|
||||||
from meow_base.patterns import FileEventPattern
|
|
||||||
from meow_base.recipes import get_recipe_from_notebook
|
|
||||||
|
|
||||||
from shared import run_test, SRME
|
|
||||||
|
|
||||||
def single_rule_multiple_events(job_count:int, REPEATS:int, job_counter:int,
|
|
||||||
requested_jobs:int, runtime_start:float):
|
|
||||||
patterns = {}
|
|
||||||
pattern = FileEventPattern(
|
|
||||||
f"pattern_one",
|
|
||||||
f"testing/*",
|
|
||||||
"recipe_one",
|
|
||||||
"input"
|
|
||||||
)
|
|
||||||
patterns[pattern.name] = pattern
|
|
||||||
|
|
||||||
recipe = get_recipe_from_notebook("recipe_one", "test.ipynb")
|
|
||||||
|
|
||||||
recipes = {
|
|
||||||
recipe.name: recipe
|
|
||||||
}
|
|
||||||
|
|
||||||
run_test(
|
|
||||||
patterns,
|
|
||||||
recipes,
|
|
||||||
job_count,
|
|
||||||
job_count,
|
|
||||||
REPEATS,
|
|
||||||
job_counter,
|
|
||||||
requested_jobs,
|
|
||||||
runtime_start,
|
|
||||||
signature=SRME
|
|
||||||
)
|
|
@ -1,36 +0,0 @@
|
|||||||
|
|
||||||
from meow_base.patterns import FileEventPattern
|
|
||||||
from meow_base.recipes import get_recipe_from_notebook
|
|
||||||
|
|
||||||
from meow_base.functionality.meow import create_parameter_sweep
|
|
||||||
from shared import run_test, SRSEP
|
|
||||||
|
|
||||||
def single_rule_single_event_parallel(job_count:int, REPEATS:int,
|
|
||||||
job_counter:int, requested_jobs:int, runtime_start:float):
|
|
||||||
patterns = {}
|
|
||||||
pattern = FileEventPattern(
|
|
||||||
f"pattern_one",
|
|
||||||
f"testing/*",
|
|
||||||
"recipe_one",
|
|
||||||
"input",
|
|
||||||
sweep=create_parameter_sweep("var", 1, job_count, 1)
|
|
||||||
)
|
|
||||||
patterns[pattern.name] = pattern
|
|
||||||
|
|
||||||
recipe = get_recipe_from_notebook("recipe_one", "test.ipynb")
|
|
||||||
|
|
||||||
recipes = {
|
|
||||||
recipe.name: recipe
|
|
||||||
}
|
|
||||||
|
|
||||||
run_test(
|
|
||||||
patterns,
|
|
||||||
recipes,
|
|
||||||
1,
|
|
||||||
job_count,
|
|
||||||
REPEATS,
|
|
||||||
job_counter,
|
|
||||||
requested_jobs,
|
|
||||||
runtime_start,
|
|
||||||
signature=SRSEP
|
|
||||||
)
|
|
@ -1,39 +0,0 @@
|
|||||||
|
|
||||||
from meow_base.patterns import FileEventPattern
|
|
||||||
from meow_base.recipes import get_recipe_from_notebook
|
|
||||||
|
|
||||||
from shared import run_test, SRSES
|
|
||||||
|
|
||||||
def single_rule_single_event_sequential(job_count:int, REPEATS:int,
|
|
||||||
job_counter:int, requested_jobs:int, runtime_start:float):
|
|
||||||
patterns = {}
|
|
||||||
pattern = FileEventPattern(
|
|
||||||
f"pattern_one",
|
|
||||||
f"testing/*",
|
|
||||||
"recipe_two",
|
|
||||||
"INPUT_FILE",
|
|
||||||
parameters={
|
|
||||||
"MAX_COUNT":job_count
|
|
||||||
}
|
|
||||||
)
|
|
||||||
patterns[pattern.name] = pattern
|
|
||||||
|
|
||||||
recipe = get_recipe_from_notebook("recipe_two", "sequential.ipynb")
|
|
||||||
|
|
||||||
recipes = {
|
|
||||||
recipe.name: recipe
|
|
||||||
}
|
|
||||||
|
|
||||||
run_test(
|
|
||||||
patterns,
|
|
||||||
recipes,
|
|
||||||
1,
|
|
||||||
job_count,
|
|
||||||
REPEATS,
|
|
||||||
job_counter,
|
|
||||||
requested_jobs,
|
|
||||||
runtime_start,
|
|
||||||
signature=SRSES,
|
|
||||||
execution=True,
|
|
||||||
print_logging=False
|
|
||||||
)
|
|
@ -1,47 +0,0 @@
|
|||||||
{
|
|
||||||
"cells": [
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"ename": "",
|
|
||||||
"evalue": "",
|
|
||||||
"output_type": "error",
|
|
||||||
"traceback": [
|
|
||||||
"\u001b[1;31mFailed to start the Kernel. \n",
|
|
||||||
"Failed to start the Kernel 'Python 3.6.9 64-bit'. \n",
|
|
||||||
"View Jupyter <a href='command:jupyter.viewOutput'>log</a> for further details. Kernel has not been started"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"print('this is some outpug ')"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": []
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"metadata": {
|
|
||||||
"interpreter": {
|
|
||||||
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
|
|
||||||
},
|
|
||||||
"kernelspec": {
|
|
||||||
"display_name": "Python 3.6.9 64-bit",
|
|
||||||
"language": "python",
|
|
||||||
"name": "python3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"name": "python",
|
|
||||||
"version": "3.6.9"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 4
|
|
||||||
}
|
|
@ -559,6 +559,14 @@ class WatchdogEventHandler(PatternMatchingEventHandler):
|
|||||||
else:
|
else:
|
||||||
self._recent_jobs[event.src_path] = \
|
self._recent_jobs[event.src_path] = \
|
||||||
[event.time_stamp, {event.event_type}]
|
[event.time_stamp, {event.event_type}]
|
||||||
|
|
||||||
|
# If we have a closed event then short-cut the wait and send event
|
||||||
|
# immediately
|
||||||
|
if event.event_type == FILE_CLOSED_EVENT:
|
||||||
|
self.monitor.match(event)
|
||||||
|
self._recent_jobs_lock.release()
|
||||||
|
return
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
self._recent_jobs_lock.release()
|
self._recent_jobs_lock.release()
|
||||||
raise Exception(ex)
|
raise Exception(ex)
|
||||||
|
Reference in New Issue
Block a user