import socket from multiprocessing import Pipe from threading import Thread from time import time, sleep from meow_base.patterns.network_event_pattern import NetworkMonitor, \ NetworkEventPattern from meow_base.recipes.jupyter_notebook_recipe import JupyterNotebookRecipe from meow_base.tests.shared import BAREBONES_NOTEBOOK def send(port): sender = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sender.connect(("127.0.0.1", port)) sender.sendall(b'test') sender.close() def get_all(from_monitor_reader, event_count): for _ in range(event_count): if from_monitor_reader.poll(3): event = from_monitor_reader.recv() else: raise Exception("Did not receive all events") def test_network(monitor_count: int, patterns_per_monitor: int, events_per_pattern: int, start_port: int): monitors = [] port = start_port recipe = JupyterNotebookRecipe( "recipe_one", BAREBONES_NOTEBOOK) recipes = { recipe.name: recipe } from_monitor_reader, from_monitor_writer = Pipe() for _ in range(monitor_count): patterns = {} for p in range(patterns_per_monitor): pattern_name = f"pattern_{p}" patterns[pattern_name] = NetworkEventPattern( pattern_name, port, recipe.name ) port += 1 monitor = NetworkMonitor(patterns, recipes) monitor.to_runner_event = from_monitor_writer monitors.append(monitor) monitor.start() event_count = monitor_count*patterns_per_monitor*events_per_pattern receiver = Thread(target=get_all, args=(from_monitor_reader, event_count,)) receiver.start() start_time = time() for p in range(start_port, port): for _ in range(events_per_pattern): send(p) # Thread(target=send, args=(p,)).start() receiver.join() duration = time() - start_time for monitor in monitors: monitor.stop() return duration def main(): monitors = 1 patterns = 1000 events = 1 n = 50 durations = [] for i in range(n): print(" ", i, end=" \r") durations.append(test_network(monitors,patterns,events,1024)) sleep(0.5) print(f"({monitors}, {patterns}, {events}) min: {min(durations)}, max: {max(durations)}, avg: {sum(durations)/n}") if __name__ == "__main__": main()