refactored monitor handler interaction to better allow differing event types in same system

This commit is contained in:
PatchOfScotland
2023-01-13 18:04:50 +01:00
parent e9519d718f
commit d9004394c1
11 changed files with 357 additions and 355 deletions

View File

@ -17,7 +17,8 @@ from core.correctness.validation import check_type, valid_string, \
valid_dict, valid_path, valid_list, valid_existing_dir_path, \
setup_debugging
from core.correctness.vars import VALID_VARIABLE_NAME_CHARS, VALID_CHANNELS, \
SHA256, DEBUG_ERROR, DEBUG_WARNING, DEBUG_INFO
SHA256, DEBUG_ERROR, DEBUG_WARNING, DEBUG_INFO, WATCHDOG_TYPE, \
WATCHDOG_SRC, WATCHDOG_BASE, WATCHDOG_RULE
from core.functionality import wait, get_file_hash, generate_id, make_dir, \
write_yaml, write_notebook, get_file_hash, parameterize_jupyter_notebook, \
print_debug
@ -98,9 +99,9 @@ class PapermillHandler(BaseHandler):
_jobs:list[str]
_jobs_lock:threading.Lock
_print_target:Any
def __init__(self, inputs:list[VALID_CHANNELS], handler_base:str,
output_dir:str, print:Any=sys.stdout, logging:int=0)->None:
super().__init__(inputs)
def __init__(self, handler_base:str, output_dir:str, print:Any=sys.stdout,
logging:int=0)->None:
super().__init__()
self._is_valid_handler_base(handler_base)
self.handler_base = handler_base
self._is_valid_output_dir(output_dir)
@ -113,61 +114,62 @@ class PapermillHandler(BaseHandler):
print_debug(self._print_target, self.debug_level,
"Created new PapermillHandler instance", DEBUG_INFO)
def run(self)->None:
all_inputs = self.inputs + [self._stop_pipe[0]]
while True:
ready = wait(all_inputs)
# def run(self)->None:
# all_inputs = self.inputs + [self._stop_pipe[0]]
# while True:
# ready = wait(all_inputs)
#
# if self._stop_pipe[0] in ready:
# return
# else:
# for input in self.inputs:
# if input in ready:
# message = input.recv()
# event = message
# self.handle(event)
#
# def start(self)->None:
# if self._worker is None:
# self._worker = threading.Thread(
# target=self.run,
# args=[])
# self._worker.daemon = True
# self._worker.start()
# print_debug(self._print_target, self.debug_level,
# "Starting PapermillHandler run...", DEBUG_INFO)
# else:
# msg = "Repeated calls to start have no effect."
# print_debug(self._print_target, self.debug_level,
# msg, DEBUG_WARNING)
# raise RuntimeWarning(msg)
#
# def stop(self)->None:
# if self._worker is None:
# msg = "Cannot stop thread that is not started."
# print_debug(self._print_target, self.debug_level,
# msg, DEBUG_WARNING)
# raise RuntimeWarning(msg)
# else:
# self._stop_pipe[1].send(1)
# self._worker.join()
# print_debug(self._print_target, self.debug_level,
# "Worker thread stopped", DEBUG_INFO)
if self._stop_pipe[0] in ready:
return
else:
for input in self.inputs:
if input in ready:
message = input.recv()
event, rule = message
self.handle(event, rule)
def start(self)->None:
if self._worker is None:
self._worker = threading.Thread(
target=self.run,
args=[])
self._worker.daemon = True
self._worker.start()
print_debug(self._print_target, self.debug_level,
"Starting PapermillHandler run...", DEBUG_INFO)
else:
msg = "Repeated calls to start have no effect."
print_debug(self._print_target, self.debug_level,
msg, DEBUG_WARNING)
raise RuntimeWarning(msg)
def stop(self)->None:
if self._worker is None:
msg = "Cannot stop thread that is not started."
print_debug(self._print_target, self.debug_level,
msg, DEBUG_WARNING)
raise RuntimeWarning(msg)
else:
self._stop_pipe[1].send(1)
self._worker.join()
print_debug(self._print_target, self.debug_level,
"Worker thread stopped", DEBUG_INFO)
def handle(self, event:FileSystemEvent, rule:BaseRule)->None:
def handle(self, event:dict[Any,Any])->None:
# TODO finish implementation and test
print_debug(self._print_target, self.debug_level,
f"Handling event {event.src_path}", DEBUG_INFO)
f"Handling event {event[WATCHDOG_SRC]}", DEBUG_INFO)
file_hash = get_file_hash(event.src_path, SHA256)
file_hash = get_file_hash(event[WATCHDOG_SRC], SHA256)
rule = event[WATCHDOG_RULE]
yaml_dict = {}
for var, val in rule.pattern.parameters.items():
yaml_dict[var] = val
for var, val in rule.pattern.outputs.items():
yaml_dict[var] = val
yaml_dict[rule.pattern.triggering_file] = event.src_path
yaml_dict[rule.pattern.triggering_file] = event[WATCHDOG_SRC]
if not rule.pattern.sweep:
waiting_for_threaded_resources = True
@ -175,7 +177,7 @@ class PapermillHandler(BaseHandler):
try:
worker = threading.Thread(
target=self.execute_job,
args=[event, rule, yaml_dict, file_hash])
args=[event, yaml_dict, file_hash])
worker.daemon = True
worker.start()
waiting_for_threaded_resources = False
@ -184,7 +186,6 @@ class PapermillHandler(BaseHandler):
else:
for var, val in rule.pattern.sweep.items():
values = []
par_val = rule.pattern.sweep[SWEEP_START]
while par_val <= rule.pattern.sweep[SWEEP_STOP]:
values.append(par_val)
@ -197,7 +198,7 @@ class PapermillHandler(BaseHandler):
try:
worker = threading.Thread(
target=self.execute_job,
args=[event, rule, yaml_dict, file_hash])
args=[event, yaml_dict, file_hash])
worker.daemon = True
worker.start()
waiting_for_threaded_resources = False
@ -223,6 +224,9 @@ class PapermillHandler(BaseHandler):
self._jobs_lock.release()
return jobs_deepcopy
def valid_event_types(self)->list[str]:
return [WATCHDOG_TYPE]
def _is_valid_inputs(self, inputs:list[VALID_CHANNELS])->None:
valid_list(inputs, VALID_CHANNELS)
@ -232,23 +236,23 @@ class PapermillHandler(BaseHandler):
def _is_valid_output_dir(self, output_dir)->None:
valid_existing_dir_path(output_dir, allow_base=True)
def execute_job(self, event:FileSystemEvent, rule:BaseRule,
def execute_job(self, event:FileSystemEvent,
yaml_dict:dict[str,Any], triggerfile_hash:str)->None:
job_dict = {
JOB_ID: generate_id(prefix="job_", existing_ids=self.get_jobs()),
JOB_PATTERN: rule.pattern,
JOB_RECIPE: rule.recipe,
JOB_RULE: rule.name,
JOB_PATH: event.src_path,
JOB_PATTERN: event[WATCHDOG_RULE].pattern,
JOB_RECIPE: event[WATCHDOG_RULE].recipe,
JOB_RULE: event[WATCHDOG_RULE].name,
JOB_PATH: event[WATCHDOG_SRC],
JOB_HASH: triggerfile_hash,
JOB_STATUS: STATUS_QUEUED,
JOB_CREATE_TIME: datetime.now(),
JOB_REQUIREMENTS: rule.recipe.requirements
JOB_REQUIREMENTS: event[WATCHDOG_RULE].recipe.requirements
}
print_debug(self._print_target, self.debug_level,
f"Creating job for event at {event.src_path} with ID "
f"Creating job for event at {event[WATCHDOG_SRC]} with ID "
f"{job_dict[JOB_ID]}", DEBUG_INFO)
self.add_job(job_dict[JOB_ID])
@ -256,8 +260,8 @@ class PapermillHandler(BaseHandler):
yaml_dict = self.replace_keywords(
yaml_dict,
job_dict[JOB_ID],
event.src_path,
event.monitor_base
event[WATCHDOG_SRC],
event[WATCHDOG_BASE]
)
job_dir = os.path.join(self.handler_base, job_dict[JOB_ID])
@ -267,7 +271,7 @@ class PapermillHandler(BaseHandler):
write_yaml(job_dict, meta_file)
base_file = os.path.join(job_dir, BASE_FILE)
write_notebook(rule.recipe.recipe, base_file)
write_notebook(event[WATCHDOG_RULE].recipe.recipe, base_file)
param_file = os.path.join(job_dir, PARAMS_FILE)
write_yaml(yaml_dict, param_file)
@ -298,7 +302,7 @@ class PapermillHandler(BaseHandler):
try:
job_notebook = parameterize_jupyter_notebook(
rule.recipe.recipe, yaml_dict
event[WATCHDOG_RULE].recipe.recipe, yaml_dict
)
write_notebook(job_notebook, job_file)
except Exception: