import sys import socket import threading import tempfile import hashlib from os import unlink from http.server import HTTPServer from time import time from typing import Any, Dict, List from http_parser.parser import HttpParser from meow_base.functionality.validation import valid_string, valid_dict from meow_base.core.vars import VALID_RECIPE_NAME_CHARS, VALID_VARIABLE_NAME_CHARS, DEBUG_INFO from meow_base.core.base_recipe import BaseRecipe from meow_base.core.base_monitor import BaseMonitor from meow_base.core.base_pattern import BasePattern from meow_base.functionality.meow import create_event from meow_base.functionality.debug import setup_debugging, print_debug from meow_base.core.meow import EVENT_KEYS from meow_base.patterns.file_event_pattern import WATCHDOG_BASE, WATCHDOG_HASH # network events EVENT_TYPE_NETWORK = "network" TRIGGERING_PORT = "triggering_port" NETWORK_EVENT_KEYS = { TRIGGERING_PORT: int, WATCHDOG_HASH: str, WATCHDOG_BASE: str, **EVENT_KEYS } def create_network_event(temp_path:str, rule:Any, time:float, port: int, file_hash: str, extras:Dict[Any,Any]={})->Dict[Any,Any]: """Function to create a MEOW event dictionary.""" return create_event( EVENT_TYPE_NETWORK, temp_path, rule, time, extras={ TRIGGERING_PORT: port, WATCHDOG_HASH: file_hash, WATCHDOG_BASE: "", **extras } ) class NetworkEventPattern(BasePattern): # The port to monitor triggering_port:int def __init__(self, name: str, triggering_port:int, recipe: str, parameters: Dict[str, Any] = {}, outputs: Dict[str, Any] = {}, sweep: Dict[str, Any] = {}): super().__init__(name, recipe, parameters, outputs, sweep) self._is_valid_port(triggering_port) self.triggering_port = triggering_port def _is_valid_port(self, port:int)->None: if not isinstance(port, int): raise ValueError ( f"Port '{port}' is not of type int." ) elif not (1023 < port < 49152): raise ValueError ( f"Port '{port}' is not valid." ) def _is_valid_recipe(self, recipe:str)->None: """Validation check for 'recipe' variable from main constructor. Called within parent BasePattern constructor.""" valid_string(recipe, VALID_RECIPE_NAME_CHARS) def _is_valid_parameters(self, parameters:Dict[str,Any])->None: """Validation check for 'parameters' variable from main constructor. Called within parent BasePattern constructor.""" valid_dict(parameters, str, Any, strict=False, min_length=0) for k in parameters.keys(): valid_string(k, VALID_VARIABLE_NAME_CHARS) def _is_valid_output(self, outputs:Dict[str,str])->None: """Validation check for 'output' variable from main constructor. Called within parent BasePattern constructor.""" valid_dict(outputs, str, str, strict=False, min_length=0) for k in outputs.keys(): valid_string(k, VALID_VARIABLE_NAME_CHARS) class NetworkMonitor(BaseMonitor): def __init__(self, patterns: Dict[str, NetworkEventPattern], recipes: Dict[str, BaseRecipe], autostart=False, name:str="", print:Any=sys.stdout, logging:int=0) -> None: super().__init__(patterns, recipes, name=name) self._print_target, self.debug_level = setup_debugging(print, logging) self.ports = set() self.listeners = [] if not hasattr(self, "listener_type"): self.listener_type = Listener if autostart: self.start() def start(self)->None: """Function to start the monitor as an ongoing process/thread. Must be implemented by any child process. Depending on the nature of the monitor, this may wish to directly call apply_retroactive_rules before starting.""" self.temp_files = [] self.ports = set( rule.pattern.triggering_port for rule in self._rules.values() ) self.listeners = [ self.listener_type("127.0.0.1",i,2048,self) for i in self.ports ] for listener in self.listeners: listener.start() def match(self, event)->None: """Function to determine if a given event matches the current rules.""" self._rules_lock.acquire() try: self.temp_files.append(event["tmp file"]) for rule in self._rules.values(): # Match event port against rule ports hit = event["triggering port"] == rule.pattern.triggering_port # If matched, the create a watchdog event if hit: meow_event = create_network_event( event["tmp file"], rule, event["time stamp"], event["triggering port"], event["file hash"] ) print_debug(self._print_target, self.debug_level, f"Event at {event['triggering port']} hit rule {rule.name}", DEBUG_INFO) # Send the event to the runner self.send_event_to_runner(meow_event) except Exception as e: self._rules_lock.release() raise e self._rules_lock.release() def _delete_temp_files(self): for file in self.temp_files: unlink(file) self.temp_files = [] def stop(self)->None: """Function to stop the monitor as an ongoing process/thread. Must be implemented by any child process""" for listener in self.listeners: listener.stop() self._delete_temp_files() def _is_valid_recipes(self, recipes:Dict[str,BaseRecipe])->None: """Validation check for 'recipes' variable from main constructor. Is automatically called during initialisation.""" valid_dict(recipes, str, BaseRecipe, min_length=0, strict=False) def _get_valid_pattern_types(self)->List[type]: return [NetworkEventPattern] def _get_valid_recipe_types(self)->List[type]: return [BaseRecipe] class Listener(): def __init__(self, host: int, port: int, buff_size: int, monitor:NetworkMonitor) -> None: self.host = host self.port = port self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.settimeout(0.5) self._stopped = False self.buff_size = buff_size self.monitor = monitor def start(self): self._handle_thread = threading.Thread( target=self.main_loop ) self._handle_thread.start() def main_loop(self): self.socket.bind((self.host, self.port)) self.socket.listen(1) while not self._stopped: try: conn, _ = self.socket.accept() except socket.timeout: pass except: raise else: threading.Thread( target=self.handle_event, args=(conn,time(),) ).start() self.socket.close() def receive_data(self,conn): with conn: with tempfile.NamedTemporaryFile("wb", delete=False) as tmp: while True: data = conn.recv(self.buff_size) if not data: break tmp.write(data) tmp_name = tmp.name return tmp_name def handle_event(self, conn, time_stamp): tmp_name = self.receive_data(conn) with open(tmp_name, "rb") as file_pointer: file_hash = hashlib.sha256(file_pointer.read()).hexdigest() event = { "triggering port": self.port, "tmp file": tmp_name, "time stamp": time_stamp, "file hash": file_hash } self.monitor.match(event) def stop(self): self._stopped = True class HTTPMonitor(NetworkMonitor): def __init__(self, patterns: Dict[str, NetworkEventPattern], recipes: Dict[str, BaseRecipe], autostart=False, name: str = "", print: Any = sys.stdout, logging: int = 0) -> None: self.listener_type = HTTPListener() super().__init__(patterns, recipes, autostart, name, print, logging) class HTTPListener(Listener): def receive_data(self,conn): parser = HttpParser() with conn: with tempfile.NamedTemporaryFile("wb", delete=False) as tmp: while True: data = conn.recv(self.buff_size) if not data: break received = len(data) parsed = parser.execute(data, received) assert parsed == received if parser.is_partial_body(): tmp.write(parser.recv_body()) if parser.is_message_complete(): break tmp_name = tmp.name return tmp_name