Files
meow_base/patterns/network_event_pattern.py
2023-05-22 10:57:54 +02:00

140 lines
5.0 KiB
Python

import sys
import socket
from typing import Any, Dict, List
from meow_base.functionality.validation import valid_string, valid_dict
from meow_base.core.vars import VALID_RECIPE_NAME_CHARS, VALID_VARIABLE_NAME_CHARS
from meow_base.core.base_recipe import BaseRecipe
from meow_base.core.base_monitor import BaseMonitor
from meow_base.core.base_pattern import BasePattern
from meow_base.functionality.meow import create_event
from meow_base.functionality.debug import setup_debugging, print_debug
from meow_base.core.meow import EVENT_KEYS
# watchdog events
EVENT_TYPE_NETWORK = "network"
NETWORK_BASE = "monitor_base"
NETWORK_HASH = "file_hash"
NETWORK_EVENT_KEYS = {
NETWORK_BASE: str,
NETWORK_HASH: str,
**EVENT_KEYS
}
def create_network_event(temp_path:str, rule:Any, base:str, time:float,
hash:str, extras:Dict[Any,Any]={})->Dict[Any,Any]:
"""Function to create a MEOW event dictionary."""
return create_event(
EVENT_TYPE_NETWORK,
temp_path,
rule,
time,
extras={
**extras,
**{
NETWORK_HASH: hash,
NETWORK_BASE: base
}
}
)
class Connector():
def __init__(self, host: int, port: int) -> None:
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Only used for testing
def send(self, message: bytes) -> None:
self.socket.connect((self.host, self.port))
self.socket.sendall(message)
self.socket.close()
def receive(self, path:str, buff_size:int = 2048):
self.socket.bind((self.host, self.port))
self.socket.listen()
conn, _ = self.socket.accept()
with conn:
with open(path, "wb") as file_pointer:
while True:
data = conn.recv(buff_size)
if not data:
break
file_pointer.write(data)
class NetworkEventPattern(BasePattern):
# The port to monitor
triggering_port:int
def __init__(self, name: str, triggering_port:int, recipe: str, parameters: Dict[str, Any] = {}, outputs: Dict[str, Any] = {}, sweep: Dict[str, Any] = {}):
super().__init__(name, recipe, parameters, outputs, sweep)
self._is_valid_port(triggering_port)
self.triggering_port = triggering_port
def _is_valid_port(self, port:int)->None:
if not isinstance(port, int):
raise ValueError (
f"Port '{port}' is not of type int."
)
elif not (0 < port < 65536):
raise ValueError (
f"Port '{port}' is not valid."
)
def _is_valid_recipe(self, recipe:str)->None:
"""Validation check for 'recipe' variable from main constructor.
Called within parent BasePattern constructor."""
valid_string(recipe, VALID_RECIPE_NAME_CHARS)
def _is_valid_parameters(self, parameters:Dict[str,Any])->None:
"""Validation check for 'parameters' variable from main constructor.
Called within parent BasePattern constructor."""
valid_dict(parameters, str, Any, strict=False, min_length=0)
for k in parameters.keys():
valid_string(k, VALID_VARIABLE_NAME_CHARS)
def _is_valid_output(self, outputs:Dict[str,str])->None:
"""Validation check for 'output' variable from main constructor.
Called within parent BasePattern constructor."""
valid_dict(outputs, str, str, strict=False, min_length=0)
for k in outputs.keys():
valid_string(k, VALID_VARIABLE_NAME_CHARS)
class NetworkMonitor(BaseMonitor):
def __init__(self, patterns: Dict[str, NetworkEventPattern],
recipes: Dict[str, BaseRecipe], autostart=False,
name:str="", print:Any=sys.stdout, logging:int=0) -> None:
super().__init__(patterns, recipes, name=name)
self._print_target, self.debug_level = setup_debugging(print, logging)
self.ports = set(
pattern.triggering_port for pattern in patterns.values()
)
if autostart:
self.start()
def start(self)->None:
"""Function to start the monitor as an ongoing process/thread. Must be
implemented by any child process. Depending on the nature of the
monitor, this may wish to directly call apply_retroactive_rules before
starting."""
pass
def stop(self)->None:
"""Function to stop the monitor as an ongoing process/thread. Must be
implemented by any child process"""
pass
def _is_valid_recipes(self, recipes:Dict[str,BaseRecipe])->None:
"""Validation check for 'recipes' variable from main constructor. Is
automatically called during initialisation."""
valid_dict(recipes, str, BaseRecipe, min_length=0, strict=False)
def _get_valid_pattern_types(self)->List[type]:
return [NetworkEventPattern]
def _get_valid_recipe_types(self)->List[type]:
return [BaseRecipe]