From ee4739e9f229a433f5d25a5148819cc88c4f81ad Mon Sep 17 00:00:00 2001 From: NikolajDanger Date: Mon, 22 May 2023 10:57:54 +0200 Subject: [PATCH] Introduction of connectors --- patterns/network_event_pattern.py | 96 +++++++++++++- tests/test_patterns.py | 211 +++++++++++++++++++++++++++++- 2 files changed, 302 insertions(+), 5 deletions(-) diff --git a/patterns/network_event_pattern.py b/patterns/network_event_pattern.py index 96db373..bc1f988 100644 --- a/patterns/network_event_pattern.py +++ b/patterns/network_event_pattern.py @@ -1,11 +1,69 @@ -from typing import Any, Dict +import sys +import socket + +from typing import Any, Dict, List from meow_base.functionality.validation import valid_string, valid_dict from meow_base.core.vars import VALID_RECIPE_NAME_CHARS, VALID_VARIABLE_NAME_CHARS from meow_base.core.base_recipe import BaseRecipe from meow_base.core.base_monitor import BaseMonitor from meow_base.core.base_pattern import BasePattern +from meow_base.functionality.meow import create_event +from meow_base.functionality.debug import setup_debugging, print_debug +from meow_base.core.meow import EVENT_KEYS +# watchdog events +EVENT_TYPE_NETWORK = "network" +NETWORK_BASE = "monitor_base" +NETWORK_HASH = "file_hash" + +NETWORK_EVENT_KEYS = { + NETWORK_BASE: str, + NETWORK_HASH: str, + **EVENT_KEYS +} + +def create_network_event(temp_path:str, rule:Any, base:str, time:float, + hash:str, extras:Dict[Any,Any]={})->Dict[Any,Any]: + """Function to create a MEOW event dictionary.""" + return create_event( + EVENT_TYPE_NETWORK, + temp_path, + rule, + time, + extras={ + **extras, + **{ + NETWORK_HASH: hash, + NETWORK_BASE: base + } + } + ) + +class Connector(): + def __init__(self, host: int, port: int) -> None: + self.host = host + self.port = port + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Only used for testing + def send(self, message: bytes) -> None: + self.socket.connect((self.host, self.port)) + self.socket.sendall(message) + self.socket.close() + + def receive(self, path:str, buff_size:int = 2048): + self.socket.bind((self.host, self.port)) + self.socket.listen() + conn, _ = self.socket.accept() + + with conn: + with open(path, "wb") as file_pointer: + while True: + data = conn.recv(buff_size) + if not data: + break + file_pointer.write(data) class NetworkEventPattern(BasePattern): # The port to monitor @@ -46,6 +104,36 @@ class NetworkEventPattern(BasePattern): valid_string(k, VALID_VARIABLE_NAME_CHARS) class NetworkMonitor(BaseMonitor): - def __init__(self, patterns: Dict[str, NetworkEventPattern], recipes: Dict[str, BaseRecipe]) -> None: - super().__init__(patterns, recipes) - self.ports = set(pattern.triggering_port for pattern in patterns.values()) + def __init__(self, patterns: Dict[str, NetworkEventPattern], + recipes: Dict[str, BaseRecipe], autostart=False, + name:str="", print:Any=sys.stdout, logging:int=0) -> None: + super().__init__(patterns, recipes, name=name) + self._print_target, self.debug_level = setup_debugging(print, logging) + self.ports = set( + pattern.triggering_port for pattern in patterns.values() + ) + if autostart: + self.start() + + def start(self)->None: + """Function to start the monitor as an ongoing process/thread. Must be + implemented by any child process. Depending on the nature of the + monitor, this may wish to directly call apply_retroactive_rules before + starting.""" + pass + + def stop(self)->None: + """Function to stop the monitor as an ongoing process/thread. Must be + implemented by any child process""" + pass + + def _is_valid_recipes(self, recipes:Dict[str,BaseRecipe])->None: + """Validation check for 'recipes' variable from main constructor. Is + automatically called during initialisation.""" + valid_dict(recipes, str, BaseRecipe, min_length=0, strict=False) + + def _get_valid_pattern_types(self)->List[type]: + return [NetworkEventPattern] + + def _get_valid_recipe_types(self)->List[type]: + return [BaseRecipe] diff --git a/tests/test_patterns.py b/tests/test_patterns.py index 96f8bbc..88ece01 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -1,6 +1,8 @@ import io import os import unittest +import threading +import time from datetime import datetime from multiprocessing import Pipe @@ -14,7 +16,9 @@ from meow_base.functionality.meow import create_rule from meow_base.patterns.file_event_pattern import FileEventPattern, \ WatchdogMonitor, _DEFAULT_MASK, WATCHDOG_HASH, WATCHDOG_BASE, \ EVENT_TYPE_WATCHDOG, WATCHDOG_EVENT_KEYS, create_watchdog_event -from patterns.network_event_pattern import NetworkEventPattern +from meow_base.patterns.network_event_pattern import NetworkEventPattern, \ + NetworkMonitor, NETWORK_HASH, NETWORK_BASE, EVENT_TYPE_NETWORK, \ + NETWORK_EVENT_KEYS, create_network_event, Connector from meow_base.recipes.jupyter_notebook_recipe import JupyterNotebookRecipe from meow_base.recipes.python_recipe import PythonRecipe from shared import BAREBONES_NOTEBOOK, TEST_MONITOR_BASE, \ @@ -1078,3 +1082,208 @@ class NetworkEventPatternTests(unittest.TestCase): fep = NetworkEventPattern("name", 9000, "recipe", outputs=outputs) self.assertEqual(fep.outputs, outputs) +class NetworkMonitorTests(unittest.TestCase): + def setUp(self)->None: + super().setUp() + setup() + + def tearDown(self)->None: + super().tearDown() + teardown() + + # Test creation of network event dict + def testCreateNetworkEvent(self)->None: + pattern = NetworkEventPattern( + "pattern", + 8181, + "recipe_one", + parameters={ + "extra":"A line from a test Pattern", + "outfile":"result_path" + }) + recipe = JupyterNotebookRecipe( + "recipe_one", APPENDING_NOTEBOOK) + + rule = create_rule(pattern, recipe) + + with self.assertRaises(TypeError): + event = create_network_event("path", rule) + + event = create_network_event( + "path", rule, "base", time(), "hash") + + self.assertEqual(type(event), dict) + self.assertEqual(len(event.keys()), len(NETWORK_EVENT_KEYS)) + for key, value in NETWORK_EVENT_KEYS.items(): + self.assertTrue(key in event.keys()) + self.assertIsInstance(event[key], value) + self.assertEqual(event[EVENT_TYPE], EVENT_TYPE_NETWORK) + self.assertEqual(event[EVENT_PATH], "path") + self.assertEqual(event[EVENT_RULE], rule) + self.assertEqual(event[NETWORK_BASE], "base") + self.assertEqual(event[NETWORK_HASH], "hash") + + event = create_network_event( + "path2", + rule, + "base", + time(), + "hash", + extras={"a":1} + ) + + self.assertEqual(type(event), dict) + self.assertTrue(EVENT_TYPE in event.keys()) + self.assertTrue(EVENT_PATH in event.keys()) + self.assertTrue(EVENT_RULE in event.keys()) + self.assertTrue(NETWORK_BASE in event.keys()) + self.assertTrue(NETWORK_HASH in event.keys()) + self.assertEqual(len(event.keys()), len(NETWORK_EVENT_KEYS)+1) + for key, value in NETWORK_EVENT_KEYS.items(): + self.assertTrue(key in event.keys()) + self.assertIsInstance(event[key], value) + self.assertEqual(event[EVENT_TYPE], EVENT_TYPE_NETWORK) + self.assertEqual(event[EVENT_PATH], "path2") + self.assertEqual(event[EVENT_RULE], rule) + self.assertEqual(event["a"], 1) + self.assertEqual(event[NETWORK_BASE], "base") + self.assertEqual(event[NETWORK_HASH], "hash") + + + # Test NetworkMonitor created + def testNetworkMonitorMinimum(self)->None: + from_monitor = Pipe() + NetworkMonitor({}, {}, from_monitor[1]) + + # Test NetworkMonitor naming + def testNetworkMonitorNaming(self)->None: + test_name = "test_name" + monitor = NetworkMonitor({}, {}, name=test_name) + self.assertEqual(monitor.name, test_name) + + monitor = NetworkMonitor({}, {}) + self.assertTrue(monitor.name.startswith("monitor_")) + + # Test Connector data transfer when packet is smaller than buffer + def testConnector(self)->None: + message = b'test data' + localhost = "127.0.0.1" + port = 8181 + temp_path = f"{TEST_MONITOR_BASE}/result" + + receiver = Connector(localhost, port) + sender = Connector(localhost, port) + + receiver_thread = threading.Thread( + target=receiver.receive, + args=(temp_path,) + ) + + sender_thread = threading.Thread( + target=sender.send, + args=(message,) + ) + + receiver_thread.start() + sender_thread.start() + + sender_thread.join(5) + receiver_thread.join(5) + + self.assertFalse(sender_thread.is_alive()) + self.assertFalse(receiver_thread.is_alive()) + with open(temp_path,"rb") as file_pointer: + result_data = file_pointer.read() + + self.assertEqual(message, result_data) + + # Test Connector data transfer when packet is larger than buffer + def testConnectorBiggerThanBuffer(self)->None: + message = b'test data' + buffer_size = 4 + localhost = "127.0.0.1" + port = 8181 + temp_path = f"{TEST_MONITOR_BASE}/result" + + receiver = Connector(localhost, port) + sender = Connector(localhost, port) + + receiver_thread = threading.Thread( + target=receiver.receive, + args=(temp_path,buffer_size,) + ) + + sender_thread = threading.Thread( + target=sender.send, + args=(message,) + ) + + receiver_thread.start() + sender_thread.start() + + sender_thread.join(5) + receiver_thread.join(5) + + self.assertFalse(sender_thread.is_alive()) + self.assertFalse(receiver_thread.is_alive()) + with open(temp_path,"rb") as file_pointer: + result_data = file_pointer.read() + + self.assertEqual(message, result_data) + + # Test NetworkMonitor identifies expected network events + def testNetworkMonitorEventIdentification(self)->None: + port = 8181 + + from_monitor_reader, from_monitor_writer = Pipe() + + pattern_one = NetworkEventPattern( + "pattern_one", port, "recipe_one") + recipe = JupyterNotebookRecipe( + "recipe_one", BAREBONES_NOTEBOOK) + + patterns = { + pattern_one.name: pattern_one, + } + recipes = { + recipe.name: recipe, + } + + wm = NetworkMonitor(patterns, recipes) + wm.to_runner_event = from_monitor_writer + + rules = wm.get_rules() + + self.assertEqual(len(rules), 1) + rule = rules[list(rules.keys())[0]] + + # wm.start() + + # # open(os.path.join(TEST_MONITOR_BASE, "A"), "w") + + # if from_monitor_reader.poll(3): + # message = from_monitor_reader.recv() + + # self.assertIsNotNone(message) + # event = message + # self.assertIsNotNone(event) + # self.assertEqual(type(event), dict) + # self.assertTrue(EVENT_TYPE in event.keys()) + # self.assertTrue(EVENT_PATH in event.keys()) + # self.assertTrue(WATCHDOG_BASE in event.keys()) + # self.assertTrue(EVENT_RULE in event.keys()) + # self.assertEqual(event[EVENT_TYPE], EVENT_TYPE_WATCHDOG) + # self.assertEqual(event[EVENT_PATH], + # os.path.join(TEST_MONITOR_BASE, "A")) + # self.assertEqual(event[WATCHDOG_BASE], TEST_MONITOR_BASE) + # self.assertEqual(event[EVENT_RULE].name, rule.name) + + # # open(os.path.join(TEST_MONITOR_BASE, "B"), "w") + + # if from_monitor_reader.poll(3): + # new_message = from_monitor_reader.recv() + # else: + # new_message = None + # self.assertIsNone(new_message) + + # wm.stop()