diff --git a/core/base_handler.py b/core/base_handler.py index 6ce0ce2..1430999 100644 --- a/core/base_handler.py +++ b/core/base_handler.py @@ -1,6 +1,6 @@ """ -This file contains the base MEOW handler defintion. This should be inherited +This file contains the base MEOW handler defintion. This should be inherited from for all handler instances. Author(s): David Marchant @@ -27,28 +27,28 @@ from meow_base.functionality.meow import create_job_metadata_dict, \ from meow_base.functionality.naming import generate_handler_id class BaseHandler: - # An identifier for a handler within the runner. Can be manually set in + # An identifier for a handler within the runner. Can be manually set in # the constructor, or autogenerated if no name provided. name:str - # A channel for sending messages to the runner event queue. Note that this - # will be overridden by a MeowRunner, if a handler instance is passed to - # it, and so does not need to be initialised within the handler itself, + # A channel for sending messages to the runner event queue. Note that this + # will be overridden by a MeowRunner, if a handler instance is passed to + # it, and so does not need to be initialised within the handler itself, # unless the handler is running independently of a runner. to_runner_event: VALID_CHANNELS - # A channel for sending messages to the runner job queue. Note that this - # will be overridden by a MeowRunner, if a handler instance is passed to - # it, and so does not need to be initialised within the handler itself, + # A channel for sending messages to the runner job queue. Note that this + # will be overridden by a MeowRunner, if a handler instance is passed to + # it, and so does not need to be initialised within the handler itself, # unless the handler is running independently of a runner. - to_runner_job: VALID_CHANNELS - # Directory where queued jobs are initially written to. Note that this - # will be overridden by a MeowRunner, if a handler instance is passed to + to_runner_job: VALID_CHANNELS + # Directory where queued jobs are initially written to. Note that this + # will be overridden by a MeowRunner, if a handler instance is passed to # it, and so does not need to be initialised within the handler itself. job_queue_dir:str - # A count, for how long a handler will wait if told that there are no + # A count, for how long a handler will wait if told that there are no # events in the runner, before polling again. Default is 5 seconds. pause_time: int def __init__(self, name:str='', pause_time:int=5)->None: - """BaseHandler Constructor. This will check that any class inheriting + """BaseHandler Constructor. This will check that any class inheriting from it implements its validation functions.""" check_implementation(type(self).valid_handle_criteria, BaseHandler) check_implementation(type(self).get_created_job_type, BaseHandler) @@ -61,7 +61,7 @@ class BaseHandler: self.pause_time = pause_time def __new__(cls, *args, **kwargs): - """A check that this base class is not instantiated itself, only + """A check that this base class is not instantiated itself, only inherited from""" if cls is BaseHandler: msg = get_drt_imp_msg(BaseHandler) @@ -69,14 +69,14 @@ class BaseHandler: return object.__new__(cls) def _is_valid_name(self, name:str)->None: - """Validation check for 'name' variable from main constructor. Is - automatically called during initialisation. This does not need to be + """Validation check for 'name' variable from main constructor. Is + automatically called during initialisation. This does not need to be overridden by child classes.""" valid_string(name, VALID_HANDLER_NAME_CHARS) def _is_valid_pause_time(self, pause_time:int)->None: - """Validation check for 'pause_time' variable from main constructor. Is - automatically called during initialisation. This does not need to be + """Validation check for 'pause_time' variable from main constructor. Is + automatically called during initialisation. This does not need to be overridden by child classes.""" valid_natural(pause_time, hint="BaseHandler.pause_time") @@ -91,16 +91,16 @@ class BaseHandler: self.to_runner_job.send(job_id) def start(self)->None: - """Function to start the handler as an ongoing thread, as defined by - the main_loop function. Together, these will execute any code in a - implemented handlers handle function sequentially, but concurrently to - any other handlers running or other runner operations. This is intended - as a naive mmultiprocessing implementation, and any more in depth - parallelisation of execution must be implemented by a user by + """Function to start the handler as an ongoing thread, as defined by + the main_loop function. Together, these will execute any code in a + implemented handlers handle function sequentially, but concurrently to + any other handlers running or other runner operations. This is intended + as a naive mmultiprocessing implementation, and any more in depth + parallelisation of execution must be implemented by a user by overriding this function, and the stop function.""" - self._stop_event = Event() + self._stop_event = Event() self._handle_thread = Thread( - target=self.main_loop, + target=self.main_loop, args=(self._stop_event,), daemon=True, name="handler_thread" @@ -108,21 +108,21 @@ class BaseHandler: self._handle_thread.start() def stop(self)->None: - """Function to stop the handler as an ongoing thread. May be overidden + """Function to stop the handler as an ongoing thread. May be overidden by any child class. This function should also be overriden if the start function has been.""" self._stop_event.set() self._handle_thread.join() - + def main_loop(self, stop_event)->None: - """Function defining an ongoing thread, as started by the start + """Function defining an ongoing thread, as started by the start function and stoped by the stop function. """ while not stop_event.is_set(): reply = self.prompt_runner_for_event() - # If we have recieved 'None' then we have already timed out so skip + # If we have recieved 'None' then we have already timed out so skip # this loop and start again if reply is None: continue @@ -138,17 +138,18 @@ class BaseHandler: self.handle(reply) except Exception as e: # TODO some error reporting here - pass + if not isinstance(e, TypeError): + raise e def valid_handle_criteria(self, event:Dict[str,Any])->Tuple[bool,str]: - """Function to determine given an event defintion, if this handler can + """Function to determine given an event defintion, if this handler can process it or not. Must be implemented by any child process.""" pass def handle(self, event:Dict[str,Any])->None: - """Function to handle a given event. May be overridden by any child - process. Note that once any handling has occured, the - send_job_to_runner function should be called to inform the runner of + """Function to handle a given event. May be overridden by any child + process. Note that once any handling has occured, the + send_job_to_runner function should be called to inform the runner of any resultant jobs.""" rule = event[EVENT_RULE] @@ -158,7 +159,7 @@ class BaseHandler: yaml_dict[var] = val for var, val in rule.pattern.outputs.items(): yaml_dict[var] = val - yaml_dict[rule.pattern.triggering_file] = event[EVENT_PATH] + # yaml_dict[rule.pattern.triggering_file] = event[EVENT_PATH] # If no parameter sweeps, then one job will suffice if not rule.pattern.sweep: @@ -172,7 +173,7 @@ class BaseHandler: self.setup_job(event, yaml_dict) def setup_job(self, event:Dict[str,Any], params_dict:Dict[str,Any])->None: - """Function to set up new job dict and send it to the runner to be + """Function to set up new job dict and send it to the runner to be executed.""" # Get base job metadata @@ -206,7 +207,7 @@ class BaseHandler: # TODO make me not tmp variables and update job dict validation "tmp recipe command": recipe_command, "tmp script command": script_command - }, + }, meta_file ) @@ -216,11 +217,11 @@ class BaseHandler: def get_created_job_type(self)->str: pass # Must implemented - def create_job_metadata_dict(self, event:Dict[str,Any], + def create_job_metadata_dict(self, event:Dict[str,Any], params_dict:Dict[str,Any])->Dict[str,Any]: return create_job_metadata_dict( - self.get_created_job_type(), - event, + self.get_created_job_type(), + event, extras={ JOB_PARAMETERS:params_dict } @@ -253,7 +254,7 @@ class BaseHandler: "# Check hash of input file to avoid race conditions", "actual_hash=$(sha256sum $event_path | cut -c -64)", "echo actual_hash: $actual_hash", - "if [ $given_hash != $actual_hash ]; then", + "if [ \"$given_hash\" != \"$actual_hash\" ]; then", " echo Job was skipped as triggering file has been modified since scheduling", " exit 134", "fi", diff --git a/core/runner.py b/core/runner.py index 9857823..9fff752 100644 --- a/core/runner.py +++ b/core/runner.py @@ -156,7 +156,7 @@ class MeowRunner: f"event for handler {component.name}. {e}", DEBUG_INFO ) - + if valid: self.event_queue.remove(event) connection.send(event) @@ -205,7 +205,7 @@ class MeowRunner: job = threadsafe_read_status(metafile) except Exception as e: print_debug( - self._print_target, + self._print_target, self.debug_level, "Could not load necessary job definitions " f"for job at '{job_dir}'. {e}", @@ -216,7 +216,7 @@ class MeowRunner: valid, _ = component.valid_execute_criteria(job) except Exception as e: print_debug( - self._print_target, + self._print_target, self.debug_level, "Could not determine validity of " f"job for conductor {component.name}. {e}", @@ -256,12 +256,12 @@ class MeowRunner: args=[]) self._mon_han_worker.daemon = True self._mon_han_worker.start() - print_debug(self._print_target, self.debug_level, + print_debug(self._print_target, self.debug_level, "Starting MeowRunner event handling...", DEBUG_INFO) else: msg = "Repeated calls to start MeowRunner event handling have " \ "no effect." - print_debug(self._print_target, self.debug_level, + print_debug(self._print_target, self.debug_level, msg, DEBUG_WARNING) raise RuntimeWarning(msg) @@ -273,12 +273,12 @@ class MeowRunner: args=[]) self._han_con_worker.daemon = True self._han_con_worker.start() - print_debug(self._print_target, self.debug_level, + print_debug(self._print_target, self.debug_level, "Starting MeowRunner job conducting...", DEBUG_INFO) else: msg = "Repeated calls to start MeowRunner job conducting have " \ "no effect." - print_debug(self._print_target, self.debug_level, + print_debug(self._print_target, self.debug_level, msg, DEBUG_WARNING) raise RuntimeWarning(msg) @@ -302,26 +302,26 @@ class MeowRunner: # If we've started the monitor/handler interaction thread, then stop it if self._mon_han_worker is None: msg = "Cannot stop event handling thread that is not started." - print_debug(self._print_target, self.debug_level, + print_debug(self._print_target, self.debug_level, msg, DEBUG_WARNING) raise RuntimeWarning(msg) else: self._stop_mon_han_pipe[1].send(1) self._mon_han_worker.join() - print_debug(self._print_target, self.debug_level, + print_debug(self._print_target, self.debug_level, "Event handler thread stopped", DEBUG_INFO) # If we've started the handler/conductor interaction thread, then stop # it if self._han_con_worker is None: msg = "Cannot stop job conducting thread that is not started." - print_debug(self._print_target, self.debug_level, + print_debug(self._print_target, self.debug_level, msg, DEBUG_WARNING) raise RuntimeWarning(msg) else: self._stop_han_con_pipe[1].send(1) self._han_con_worker.join() - print_debug(self._print_target, self.debug_level, + print_debug(self._print_target, self.debug_level, "Job conductor thread stopped", DEBUG_INFO) def get_monitor_by_name(self, queried_name:str)->BaseMonitor: diff --git a/example_workflow/job_output/job_FQQUQMTGtqUw/job.sh b/example_workflow/job_output/job_FQQUQMTGtqUw/job.sh new file mode 100755 index 0000000..56534c0 --- /dev/null +++ b/example_workflow/job_output/job_FQQUQMTGtqUw/job.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +# Get job params +given_hash=$(grep 'file_hash: *' $(dirname $0)/job.yml | tail -n1 | cut -c 14-) +event_path=$(grep 'event_path: *' $(dirname $0)/job.yml | tail -n1 | cut -c 15-) + +echo event_path: $event_path +echo given_hash: $given_hash + +# Check hash of input file to avoid race conditions +actual_hash=$(sha256sum $event_path | cut -c -64) +echo actual_hash: $actual_hash +if [ "$given_hash" != "$actual_hash" ]; then + echo Job was skipped as triggering file has been modified since scheduling + exit 134 +fi + +# Call actual job script +python3 job_queue/job_FQQUQMTGtqUw/recipe.py >>job_queue/job_FQQUQMTGtqUw/output.log 2>&1 + +exit $? \ No newline at end of file diff --git a/example_workflow/job_output/job_FQQUQMTGtqUw/job.yml b/example_workflow/job_output/job_FQQUQMTGtqUw/job.yml new file mode 100644 index 0000000..f4292ab --- /dev/null +++ b/example_workflow/job_output/job_FQQUQMTGtqUw/job.yml @@ -0,0 +1,38 @@ +create: 2023-06-07 11:54:48.117191 +end: 2023-06-07 11:54:53.231092 +error: Job execution returned non-zero. +event: + event_path: /tmp/tmp3q4q94ee + event_rule: !!python/object:meow_base.core.rule.Rule + name: rule_xjGHQxaaray + pattern: !!python/object:meow_base.patterns.network_event_pattern.NetworkEventPattern + name: echo_pattern + outputs: {} + parameters: {} + recipe: echo_recipe + sweep: {} + triggering_port: 8080 + recipe: !!python/object:meow_base.recipes.python_recipe.PythonRecipe + name: echo_recipe + parameters: {} + recipe: + - path = {PATH} + - print(path) + requirements: &id001 {} + event_time: 1686131685.3071685 + event_type: network + file_hash: f2ca1bb6c7e907d06dafe4687e579fce76b37e4e93b7605022da52e6ccc26fd2 + monitor_base: '' + triggering_port: 8080 +id: job_FQQUQMTGtqUw +job_type: python +parameters: {} +pattern: echo_pattern +recipe: echo_recipe +requirements: *id001 +rule: rule_xjGHQxaaray +start: 2023-06-07 11:54:53.168581 +status: failed +tmp recipe command: python3 job_queue/job_FQQUQMTGtqUw/recipe.py >>job_queue/job_FQQUQMTGtqUw/output.log + 2>&1 +tmp script command: ./job.sh diff --git a/example_workflow/job_output/job_FQQUQMTGtqUw/job.yml.lock b/example_workflow/job_output/job_FQQUQMTGtqUw/job.yml.lock new file mode 100644 index 0000000..e69de29 diff --git a/example_workflow/job_output/job_FQQUQMTGtqUw/recipe.py b/example_workflow/job_output/job_FQQUQMTGtqUw/recipe.py new file mode 100755 index 0000000..9adc435 --- /dev/null +++ b/example_workflow/job_output/job_FQQUQMTGtqUw/recipe.py @@ -0,0 +1,2 @@ +path = {PATH} +print(path) \ No newline at end of file diff --git a/example_workflow/network_workflow.py b/example_workflow/network_workflow.py new file mode 100644 index 0000000..5834c6d --- /dev/null +++ b/example_workflow/network_workflow.py @@ -0,0 +1,53 @@ +from time import sleep + +from meow_base.core.runner import MeowRunner +from meow_base.patterns.network_event_pattern import NetworkMonitor, NetworkEventPattern +from meow_base.recipes.python_recipe import PythonRecipe, PythonHandler +from meow_base.conductors.local_python_conductor import LocalPythonConductor + +PORTS = [8080,8181] + +def main(): + runners = [] + for i, port in enumerate(PORTS): + other_port = PORTS[(i+1)%2] + + # Gets the script ready + script = [ + "import socket", + "sender = socket.socket(socket.AF_INET, socket.SOCK_STREAM)", + f"sender.connect(('127.0.0.1', {other_port}))", + "sender.sendall(b'test')", + "sender.close()" + ] + + # Initialize the network monitor + patterns = { + "echo_pattern":NetworkEventPattern( + "echo_pattern", + port, + "echo_recipe" + ) + } + recipes = {"echo_recipe":PythonRecipe("echo_recipe", script)} + monitors = [NetworkMonitor(patterns, recipes)] + + + # Initialize the handler and conductor + handlers = [PythonHandler()] + conductors = [LocalPythonConductor()] + + # Start the runner + runner = MeowRunner(monitors, handlers, conductors) + + runner.start() + runners.append(runner) + + sleep(120) + + for runner in runners: + runner.stop() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/patterns/network_event_pattern.py b/patterns/network_event_pattern.py index 589b68e..5e4567a 100644 --- a/patterns/network_event_pattern.py +++ b/patterns/network_event_pattern.py @@ -2,6 +2,7 @@ import sys import socket import threading import tempfile +import hashlib from os import unlink from time import time @@ -16,10 +17,11 @@ from meow_base.core.base_pattern import BasePattern from meow_base.functionality.meow import create_event from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.core.meow import EVENT_KEYS +from meow_base.patterns.file_event_pattern import WATCHDOG_BASE, WATCHDOG_HASH # network events EVENT_TYPE_NETWORK = "network" -TRIGGERING_PORT = "triggering port" +TRIGGERING_PORT = "triggering_port" NETWORK_EVENT_KEYS = { TRIGGERING_PORT: int, @@ -27,7 +29,8 @@ NETWORK_EVENT_KEYS = { } def create_network_event(temp_path:str, rule:Any, time:float, - port: int, extras:Dict[Any,Any]={})->Dict[Any,Any]: + port: int, file_hash: str, + extras:Dict[Any,Any]={})->Dict[Any,Any]: """Function to create a MEOW event dictionary.""" return create_event( EVENT_TYPE_NETWORK, @@ -36,6 +39,8 @@ def create_network_event(temp_path:str, rule:Any, time:float, time, extras={ TRIGGERING_PORT: port, + WATCHDOG_HASH: file_hash, + WATCHDOG_BASE: "", **extras } ) @@ -120,7 +125,8 @@ class NetworkMonitor(BaseMonitor): event["tmp file"], rule, event["time stamp"], - event["triggering port"] + event["triggering port"], + event["file hash"] ) print_debug(self._print_target, self.debug_level, f"Event at {event['triggering port']} hit rule {rule.name}", @@ -206,10 +212,14 @@ class Listener(): tmp_name = tmp.name + with open(tmp_name, "rb") as file_pointer: + file_hash = hashlib.sha256(file_pointer.read()).hexdigest() + event = { "triggering port": self.port, "tmp file": tmp_name, - "time stamp": time_stamp + "time stamp": time_stamp, + "file hash": file_hash } self.monitor.match(event) diff --git a/performance_test/performance_test.py b/performance_test/performance_test.py index 273c1e5..51828dd 100644 --- a/performance_test/performance_test.py +++ b/performance_test/performance_test.py @@ -2,6 +2,7 @@ import socket from multiprocessing import Pipe from threading import Thread from time import time, sleep +from numpy import std, floor, log10 from meow_base.patterns.network_event_pattern import NetworkMonitor, \ NetworkEventPattern @@ -65,6 +66,7 @@ def test_network(monitor_count: int, patterns_per_monitor: int, start_time = time() + for p in range(start_port, port): for _ in range(events_per_pattern): send(p) @@ -79,13 +81,18 @@ def test_network(monitor_count: int, patterns_per_monitor: int, return duration +def sigfigs(num): + if num < 10: + return round(num, -int(floor(log10(abs(num))-1))) + else: + return int(num) def main(): - monitors = 1000 + monitors = 1 patterns = 1 - events = 1 + events = 1000 - n = 50 + n = 100 durations = [] for i in range(n): @@ -93,7 +100,10 @@ def main(): durations.append(test_network(monitors,patterns,events,1024)) sleep(0.5) - print(f"({monitors}, {patterns}, {events}) min: {min(durations)}, max: {max(durations)}, avg: {sum(durations)/n}") + print(f"({monitors}, {patterns}, {events}) min: {min(durations)}, max: {max(durations)}, avg: {sum(durations)/n}, std: {std(durations)}") + + # print(f"{sigfigs(min(durations)*1000)}ms & {sigfigs((min(durations)*1000)/events)}ms & {sigfigs(max(durations)*1000)}ms & {sigfigs((max(durations)*1000)/events)}ms & {sigfigs((sum(durations)/n)*1000)}ms & {sigfigs(((sum(durations)/n)*1000)/events)}ms & {sigfigs(std(durations)*1000)}ms") + print(f"{sigfigs(min(durations)*1000)}ms & {sigfigs(max(durations)*1000)}ms & {sigfigs((sum(durations)/n)*1000)}ms & {sigfigs(std(durations)*1000)}ms") if __name__ == "__main__": main() diff --git a/recipes/python_recipe.py b/recipes/python_recipe.py index 4dd6a44..14d00ff 100644 --- a/recipes/python_recipe.py +++ b/recipes/python_recipe.py @@ -87,8 +87,8 @@ class PythonHandler(BaseHandler): msg = "" if type(event[EVENT_RULE].recipe) != PythonRecipe: msg = "Recipe is not a PythonRecipe. " - if event[EVENT_TYPE] != EVENT_TYPE_WATCHDOG: - msg += f"Event type is not {EVENT_TYPE_WATCHDOG}." + # if event[EVENT_TYPE] != EVENT_TYPE_WATCHDOG: + # msg += f"Event type is not {EVENT_TYPE_WATCHDOG}." if msg: return False, msg else: