390 lines
15 KiB
Python
390 lines
15 KiB
Python
|
|
"""
|
|
This file contains the base MEOW monitor defintion. This should be inherited
|
|
from for all monitor instances.
|
|
|
|
Author(s): David Marchant
|
|
"""
|
|
|
|
from copy import deepcopy
|
|
from threading import Lock
|
|
from typing import Union, Dict, List
|
|
|
|
from meow_base.core.base_pattern import BasePattern
|
|
from meow_base.core.base_recipe import BaseRecipe
|
|
from meow_base.core.rule import Rule
|
|
from meow_base.core.vars import VALID_CHANNELS, \
|
|
VALID_MONITOR_NAME_CHARS, get_drt_imp_msg
|
|
from meow_base.functionality.validation import check_implementation, \
|
|
valid_string, check_type, check_types, valid_dict_multiple_types
|
|
from meow_base.functionality.meow import create_rules, create_rule
|
|
from meow_base.functionality.naming import generate_monitor_id
|
|
|
|
|
|
class BaseMonitor:
|
|
# An identifier for a monitor within the runner. Can be manually set in
|
|
# the constructor, or autogenerated if no name provided.
|
|
name:str
|
|
# A collection of patterns
|
|
_patterns: Dict[str, BasePattern]
|
|
# A collection of recipes
|
|
_recipes: Dict[str, BaseRecipe]
|
|
# A collection of rules derived from _patterns and _recipes
|
|
_rules: Dict[str, Rule]
|
|
# A channel for sending messages to the runner event queue. Note that this
|
|
# is not initialised within the constructor, but within the runner when the
|
|
# monitor is passed to it unless the monitor is running independently of a
|
|
# runner.
|
|
to_runner_event: VALID_CHANNELS
|
|
#A lock to solve race conditions on '_patterns'
|
|
_patterns_lock:Lock
|
|
#A lock to solve race conditions on '_recipes'
|
|
_recipes_lock:Lock
|
|
#A lock to solve race conditions on '_rules'
|
|
_rules_lock:Lock
|
|
def __init__(self, patterns:Dict[str,BasePattern],
|
|
recipes:Dict[str,BaseRecipe], name:str="")->None:
|
|
"""BaseMonitor Constructor. This will check that any class inheriting
|
|
from it implements its validation functions. It will then call these on
|
|
the input parameters."""
|
|
check_implementation(type(self).start, BaseMonitor)
|
|
check_implementation(type(self).stop, BaseMonitor)
|
|
check_implementation(type(self)._get_valid_pattern_types, BaseMonitor)
|
|
self._is_valid_patterns(patterns)
|
|
check_implementation(type(self)._get_valid_recipe_types, BaseMonitor)
|
|
self._is_valid_recipes(recipes)
|
|
# Ensure that patterns and recipes cannot be trivially modified from
|
|
# outside the monitor, as this will cause internal consistency issues
|
|
self._patterns = deepcopy(patterns)
|
|
self._recipes = deepcopy(recipes)
|
|
self._rules = create_rules(patterns, recipes)
|
|
if not name:
|
|
name = generate_monitor_id()
|
|
self._is_valid_name(name)
|
|
self.name = name
|
|
self._patterns_lock = Lock()
|
|
self._recipes_lock = Lock()
|
|
self._rules_lock = Lock()
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
"""A check that this base class is not instantiated itself, only
|
|
inherited from"""
|
|
if cls is BaseMonitor:
|
|
msg = get_drt_imp_msg(BaseMonitor)
|
|
raise TypeError(msg)
|
|
return object.__new__(cls)
|
|
|
|
def _is_valid_name(self, name:str)->None:
|
|
"""Validation check for 'name' variable from main constructor. Is
|
|
automatically called during initialisation. This does not need to be
|
|
overridden by child classes."""
|
|
valid_string(name, VALID_MONITOR_NAME_CHARS)
|
|
|
|
def _is_valid_patterns(self, patterns:Dict[str,BasePattern])->None:
|
|
"""Validation check for 'patterns' variable from main constructor."""
|
|
valid_dict_multiple_types(
|
|
patterns,
|
|
str,
|
|
self._get_valid_pattern_types(),
|
|
min_length=0,
|
|
strict=False
|
|
)
|
|
|
|
def _get_valid_pattern_types(self)->List[type]:
|
|
"""Validation check used throughout monitor to check that only
|
|
compatible patterns are used. Must be implmented by any child class."""
|
|
raise NotImplementedError
|
|
|
|
def _is_valid_recipes(self, recipes:Dict[str,BaseRecipe])->None:
|
|
"""Validation check for 'recipes' variable from main constructor."""
|
|
valid_dict_multiple_types(
|
|
recipes,
|
|
str,
|
|
self._get_valid_recipe_types(),
|
|
min_length=0,
|
|
strict=False
|
|
)
|
|
|
|
def _get_valid_recipe_types(self)->List[type]:
|
|
"""Validation check used throughout monitor to check that only
|
|
compatible recipes are used. Must be implmented by any child class."""
|
|
raise NotImplementedError
|
|
|
|
def _identify_new_rules(self, new_pattern:BasePattern=None,
|
|
new_recipe:BaseRecipe=None)->None:
|
|
"""Function to determine if a new rule can be created given a new
|
|
pattern or recipe, in light of other existing patterns or recipes in
|
|
the monitor."""
|
|
|
|
if new_pattern:
|
|
self._patterns_lock.acquire()
|
|
self._recipes_lock.acquire()
|
|
try:
|
|
# Check in case pattern has been deleted since function called
|
|
if new_pattern.name not in self._patterns:
|
|
self._patterns_lock.release()
|
|
self._recipes_lock.release()
|
|
return
|
|
# If pattern specifies recipe that already exists, make a rule
|
|
if new_pattern.recipe in self._recipes:
|
|
self._create_new_rule(
|
|
new_pattern,
|
|
self._recipes[new_pattern.recipe],
|
|
)
|
|
except Exception as e:
|
|
self._patterns_lock.release()
|
|
self._recipes_lock.release()
|
|
raise e
|
|
self._patterns_lock.release()
|
|
self._recipes_lock.release()
|
|
|
|
if new_recipe:
|
|
self._patterns_lock.acquire()
|
|
self._recipes_lock.acquire()
|
|
try:
|
|
# Check in case recipe has been deleted since function called
|
|
if new_recipe.name not in self._recipes:
|
|
self._patterns_lock.release()
|
|
self._recipes_lock.release()
|
|
return
|
|
# If recipe is specified by existing pattern, make a rule
|
|
for pattern in self._patterns.values():
|
|
if pattern.recipe == new_recipe.name:
|
|
self._create_new_rule(
|
|
pattern,
|
|
new_recipe,
|
|
)
|
|
except Exception as e:
|
|
self._patterns_lock.release()
|
|
self._recipes_lock.release()
|
|
raise e
|
|
self._patterns_lock.release()
|
|
self._recipes_lock.release()
|
|
|
|
def _identify_lost_rules(self, lost_pattern:str=None,
|
|
lost_recipe:str=None)->None:
|
|
"""Function to remove rules that should be deleted in response to a
|
|
pattern or recipe having been deleted."""
|
|
to_delete = []
|
|
self._rules_lock.acquire()
|
|
try:
|
|
# Identify any offending rules
|
|
for name, rule in self._rules.items():
|
|
if lost_pattern and rule.pattern.name == lost_pattern:
|
|
to_delete.append(name)
|
|
if lost_recipe and rule.recipe.name == lost_recipe:
|
|
to_delete.append(name)
|
|
# Now delete them
|
|
for delete in to_delete:
|
|
if delete in self._rules.keys():
|
|
self._rules.pop(delete)
|
|
except Exception as e:
|
|
self._rules_lock.release()
|
|
raise e
|
|
self._rules_lock.release()
|
|
|
|
def _create_new_rule(self, pattern:BasePattern, recipe:BaseRecipe)->None:
|
|
"""Function to create a new rule from a given pattern and recipe. This
|
|
will only be called to create rules at runtime, as rules are
|
|
automatically created at initialisation using the same 'create_rule'
|
|
function called here."""
|
|
rule = create_rule(pattern, recipe)
|
|
self._rules_lock.acquire()
|
|
try:
|
|
if rule.name in self._rules:
|
|
raise KeyError("Cannot create Rule with name of "
|
|
f"'{rule.name}' as already in use")
|
|
self._rules[rule.name] = rule
|
|
except Exception as e:
|
|
self._rules_lock.release()
|
|
raise e
|
|
self._rules_lock.release()
|
|
|
|
self._apply_retroactive_rule(rule)
|
|
|
|
def _apply_retroactive_rule(self, rule:Rule)->None:
|
|
"""Function to determine if a rule should be applied to any existing
|
|
defintions, if possible. May be implemented by inherited classes."""
|
|
pass
|
|
|
|
def _apply_retroactive_rules(self)->None:
|
|
"""Function to determine if any rules should be applied to any existing
|
|
defintions, if possible. May be implemented by inherited classes."""
|
|
pass
|
|
|
|
def send_event_to_runner(self, msg):
|
|
self.to_runner_event.send(msg)
|
|
|
|
def start(self)->None:
|
|
"""Function to start the monitor as an ongoing process/thread. Must be
|
|
implemented by any child process. Depending on the nature of the
|
|
monitor, this may wish to directly call apply_retroactive_rules before
|
|
starting."""
|
|
pass
|
|
|
|
def stop(self)->None:
|
|
"""Function to stop the monitor as an ongoing process/thread. Must be
|
|
implemented by any child process"""
|
|
pass
|
|
|
|
def add_pattern(self, pattern:BasePattern)->None:
|
|
"""Function to add a pattern to the current definitions. Any rules
|
|
that can be possibly created from that pattern will be automatically
|
|
created."""
|
|
check_types(
|
|
pattern,
|
|
self._get_valid_pattern_types(),
|
|
hint="add_pattern.pattern"
|
|
)
|
|
|
|
self._patterns_lock.acquire()
|
|
try:
|
|
if pattern.name in self._patterns:
|
|
raise KeyError(f"An entry for Pattern '{pattern.name}' "
|
|
"already exists. Do you intend to update instead?")
|
|
self._patterns[pattern.name] = pattern
|
|
except Exception as e:
|
|
self._patterns_lock.release()
|
|
raise e
|
|
self._patterns_lock.release()
|
|
|
|
self._identify_new_rules(new_pattern=pattern)
|
|
|
|
def update_pattern(self, pattern:BasePattern)->None:
|
|
"""Function to update a pattern in the current definitions. Any rules
|
|
created from that pattern will be automatically updated."""
|
|
check_types(
|
|
pattern,
|
|
self._get_valid_pattern_types(),
|
|
hint="update_pattern.pattern"
|
|
)
|
|
|
|
self.remove_pattern(pattern.name)
|
|
self.add_pattern(pattern)
|
|
|
|
def remove_pattern(self, pattern: Union[str,BasePattern])->None:
|
|
"""Function to remove a pattern from the current definitions. Any rules
|
|
that will be no longer valid will be automatically removed."""
|
|
check_type(
|
|
pattern,
|
|
str,
|
|
alt_types=[BasePattern],
|
|
hint="remove_pattern.pattern"
|
|
)
|
|
lookup_key = pattern
|
|
if isinstance(lookup_key, BasePattern):
|
|
lookup_key = pattern.name
|
|
self._patterns_lock.acquire()
|
|
try:
|
|
if lookup_key not in self._patterns:
|
|
raise KeyError(f"Cannot remote Pattern '{lookup_key}' as it "
|
|
"does not already exist")
|
|
self._patterns.pop(lookup_key)
|
|
except Exception as e:
|
|
self._patterns_lock.release()
|
|
raise e
|
|
self._patterns_lock.release()
|
|
|
|
if isinstance(pattern, BasePattern):
|
|
self._identify_lost_rules(lost_pattern=pattern.name)
|
|
else:
|
|
self._identify_lost_rules(lost_pattern=pattern)
|
|
|
|
def get_patterns(self)->Dict[str,BasePattern]:
|
|
"""Function to get a dict of the currently defined patterns of the
|
|
monitor. Note that the result is deep-copied, and so can be manipulated
|
|
without directly manipulating the internals of the monitor."""
|
|
to_return = {}
|
|
self._patterns_lock.acquire()
|
|
try:
|
|
to_return = deepcopy(self._patterns)
|
|
except Exception as e:
|
|
self._patterns_lock.release()
|
|
raise e
|
|
self._patterns_lock.release()
|
|
return to_return
|
|
|
|
def add_recipe(self, recipe: BaseRecipe)->None:
|
|
"""Function to add a recipe to the current definitions. Any rules
|
|
that can be possibly created from that recipe will be automatically
|
|
created."""
|
|
check_type(recipe, BaseRecipe, hint="add_recipe.recipe")
|
|
self._recipes_lock.acquire()
|
|
try:
|
|
if recipe.name in self._recipes:
|
|
raise KeyError(f"An entry for Recipe '{recipe.name}' already "
|
|
"exists. Do you intend to update instead?")
|
|
self._recipes[recipe.name] = recipe
|
|
except Exception as e:
|
|
self._recipes_lock.release()
|
|
raise e
|
|
self._recipes_lock.release()
|
|
|
|
self._identify_new_rules(new_recipe=recipe)
|
|
|
|
def update_recipe(self, recipe: BaseRecipe)->None:
|
|
"""Function to update a recipe in the current definitions. Any rules
|
|
created from that recipe will be automatically updated."""
|
|
check_type(recipe, BaseRecipe, hint="update_recipe.recipe")
|
|
self.remove_recipe(recipe.name)
|
|
self.add_recipe(recipe)
|
|
|
|
def remove_recipe(self, recipe:Union[str,BaseRecipe])->None:
|
|
"""Function to remove a recipe from the current definitions. Any rules
|
|
that will be no longer valid will be automatically removed."""
|
|
check_type(
|
|
recipe,
|
|
str,
|
|
alt_types=[BaseRecipe],
|
|
hint="remove_recipe.recipe"
|
|
)
|
|
lookup_key = recipe
|
|
if isinstance(lookup_key, BaseRecipe):
|
|
lookup_key = recipe.name
|
|
self._recipes_lock.acquire()
|
|
try:
|
|
# Check that recipe has not already been deleted
|
|
if lookup_key not in self._recipes:
|
|
raise KeyError(f"Cannot remote Recipe '{lookup_key}' as it "
|
|
"does not already exist")
|
|
self._recipes.pop(lookup_key)
|
|
except Exception as e:
|
|
self._recipes_lock.release()
|
|
raise e
|
|
self._recipes_lock.release()
|
|
|
|
if isinstance(recipe, BaseRecipe):
|
|
self._identify_lost_rules(lost_recipe=recipe.name)
|
|
else:
|
|
self._identify_lost_rules(lost_recipe=recipe)
|
|
|
|
def get_recipes(self)->Dict[str,BaseRecipe]:
|
|
"""Function to get a dict of the currently defined recipes of the
|
|
monitor. Note that the result is deep-copied, and so can be manipulated
|
|
without directly manipulating the internals of the monitor."""
|
|
to_return = {}
|
|
self._recipes_lock.acquire()
|
|
try:
|
|
to_return = deepcopy(self._recipes)
|
|
except Exception as e:
|
|
self._recipes_lock.release()
|
|
raise e
|
|
self._recipes_lock.release()
|
|
return to_return
|
|
|
|
def get_rules(self)->Dict[str,Rule]:
|
|
"""Function to get a dict of the currently defined rules of the
|
|
monitor. Note that the result is deep-copied, and so can be manipulated
|
|
without directly manipulating the internals of the monitor."""
|
|
to_return = {}
|
|
self._rules_lock.acquire()
|
|
try:
|
|
to_return = deepcopy(self._rules)
|
|
except Exception as e:
|
|
self._rules_lock.release()
|
|
raise e
|
|
self._rules_lock.release()
|
|
return to_return
|
|
|
|
|