from typing import Any, Dict from meow_base.functionality.validation import valid_string, valid_dict from meow_base.core.vars import VALID_RECIPE_NAME_CHARS, VALID_VARIABLE_NAME_CHARS from meow_base.core.base_recipe import BaseRecipe from meow_base.core.base_monitor import BaseMonitor from meow_base.core.base_pattern import BasePattern class NetworkEventPattern(BasePattern): # The port to monitor triggering_port:int def __init__(self, name: str, triggering_port:int, recipe: str, parameters: Dict[str, Any] = {}, outputs: Dict[str, Any] = {}, sweep: Dict[str, Any] = {}): super().__init__(name, recipe, parameters, outputs, sweep) self._is_valid_port(triggering_port) self.triggering_port = triggering_port def _is_valid_port(self, port:int)->None: if not isinstance(port, int): raise ValueError ( f"Port '{port}' is not of type int." ) elif not (0 < port < 65536): raise ValueError ( f"Port '{port}' is not valid." ) def _is_valid_recipe(self, recipe:str)->None: """Validation check for 'recipe' variable from main constructor. Called within parent BasePattern constructor.""" valid_string(recipe, VALID_RECIPE_NAME_CHARS) def _is_valid_parameters(self, parameters:Dict[str,Any])->None: """Validation check for 'parameters' variable from main constructor. Called within parent BasePattern constructor.""" valid_dict(parameters, str, Any, strict=False, min_length=0) for k in parameters.keys(): valid_string(k, VALID_VARIABLE_NAME_CHARS) def _is_valid_output(self, outputs:Dict[str,str])->None: """Validation check for 'output' variable from main constructor. Called within parent BasePattern constructor.""" valid_dict(outputs, str, str, strict=False, min_length=0) for k in outputs.keys(): valid_string(k, VALID_VARIABLE_NAME_CHARS) class NetworkMonitor(BaseMonitor): def __init__(self, patterns: Dict[str, NetworkEventPattern], recipes: Dict[str, BaseRecipe]) -> None: super().__init__(patterns, recipes) self.ports = set(pattern.triggering_port for pattern in patterns.values())