279 lines
9.2 KiB
Python
279 lines
9.2 KiB
Python
|
|
import inspect
|
|
import sys
|
|
|
|
from typing import Any, Union
|
|
|
|
from core.correctness.vars import VALID_RECIPE_NAME_CHARS, \
|
|
VALID_PATTERN_NAME_CHARS, VALID_RULE_NAME_CHARS, VALID_CHANNELS, \
|
|
get_drt_imp_msg
|
|
from core.correctness.validation import valid_string, check_type, \
|
|
check_implementation, valid_list, valid_dict
|
|
from core.functionality import generate_id
|
|
|
|
|
|
class BaseRecipe:
|
|
name:str
|
|
recipe:Any
|
|
parameters:dict[str, Any]
|
|
requirements:dict[str, Any]
|
|
def __init__(self, name:str, recipe:Any, parameters:dict[str,Any]={},
|
|
requirements:dict[str,Any]={}):
|
|
check_implementation(type(self)._is_valid_recipe, BaseRecipe)
|
|
check_implementation(type(self)._is_valid_parameters, BaseRecipe)
|
|
check_implementation(type(self)._is_valid_requirements, BaseRecipe)
|
|
self._is_valid_name(name)
|
|
self.name = name
|
|
self._is_valid_recipe(recipe)
|
|
self.recipe = recipe
|
|
self._is_valid_parameters(parameters)
|
|
self.parameters = parameters
|
|
self._is_valid_requirements(requirements)
|
|
self.requirements = requirements
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if cls is BaseRecipe:
|
|
msg = get_drt_imp_msg(BaseRecipe)
|
|
raise TypeError(msg)
|
|
return object.__new__(cls)
|
|
|
|
def _is_valid_name(self, name:str)->None:
|
|
valid_string(name, VALID_RECIPE_NAME_CHARS)
|
|
|
|
def _is_valid_recipe(self, recipe:Any)->None:
|
|
pass
|
|
|
|
def _is_valid_parameters(self, parameters:Any)->None:
|
|
pass
|
|
|
|
def _is_valid_requirements(self, requirements:Any)->None:
|
|
pass
|
|
|
|
|
|
class BasePattern:
|
|
name:str
|
|
recipe:str
|
|
parameters:dict[str,Any]
|
|
outputs:dict[str,Any]
|
|
def __init__(self, name:str, recipe:str, parameters:dict[str,Any]={},
|
|
outputs:dict[str,Any]={}):
|
|
check_implementation(type(self)._is_valid_recipe, BasePattern)
|
|
check_implementation(type(self)._is_valid_parameters, BasePattern)
|
|
check_implementation(type(self)._is_valid_output, BasePattern)
|
|
self._is_valid_name(name)
|
|
self.name = name
|
|
self._is_valid_recipe(recipe)
|
|
self.recipe = recipe
|
|
self._is_valid_parameters(parameters)
|
|
self.parameters = parameters
|
|
self._is_valid_output(outputs)
|
|
self.outputs = outputs
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if cls is BasePattern:
|
|
msg = get_drt_imp_msg(BasePattern)
|
|
raise TypeError(msg)
|
|
return object.__new__(cls)
|
|
|
|
def _is_valid_name(self, name:str)->None:
|
|
valid_string(name, VALID_PATTERN_NAME_CHARS)
|
|
|
|
def _is_valid_recipe(self, recipe:Any)->None:
|
|
pass
|
|
|
|
def _is_valid_parameters(self, parameters:Any)->None:
|
|
pass
|
|
|
|
def _is_valid_output(self, outputs:Any)->None:
|
|
pass
|
|
|
|
|
|
class BaseRule:
|
|
name:str
|
|
pattern:BasePattern
|
|
recipe:BaseRecipe
|
|
pattern_type:str=""
|
|
recipe_type:str=""
|
|
def __init__(self, name:str, pattern:BasePattern, recipe:BaseRecipe):
|
|
check_implementation(type(self)._is_valid_pattern, BaseRule)
|
|
check_implementation(type(self)._is_valid_recipe, BaseRule)
|
|
self._is_valid_name(name)
|
|
self.name = name
|
|
self._is_valid_pattern(pattern)
|
|
self.pattern = pattern
|
|
self._is_valid_recipe(recipe)
|
|
self.recipe = recipe
|
|
self.__check_types_set()
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if cls is BaseRule:
|
|
msg = get_drt_imp_msg(BaseRule)
|
|
raise TypeError(msg)
|
|
return object.__new__(cls)
|
|
|
|
def _is_valid_name(self, name:str)->None:
|
|
valid_string(name, VALID_RULE_NAME_CHARS)
|
|
|
|
def _is_valid_pattern(self, pattern:Any)->None:
|
|
pass
|
|
|
|
def _is_valid_recipe(self, recipe:Any)->None:
|
|
pass
|
|
|
|
def __check_types_set(self)->None:
|
|
if self.pattern_type == "":
|
|
raise AttributeError(f"Rule Class '{self.__class__.__name__}' "
|
|
"does not set a pattern_type.")
|
|
if self.recipe_type == "":
|
|
raise AttributeError(f"Rule Class '{self.__class__.__name__}' "
|
|
"does not set a recipe_type.")
|
|
|
|
|
|
class BaseMonitor:
|
|
_patterns: dict[str, BasePattern]
|
|
_recipes: dict[str, BaseRecipe]
|
|
_rules: dict[str, BaseRule]
|
|
to_runner: VALID_CHANNELS
|
|
def __init__(self, patterns:dict[str,BasePattern], recipes:dict[str,BaseRecipe])->None:
|
|
check_implementation(type(self).start, BaseMonitor)
|
|
check_implementation(type(self).stop, BaseMonitor)
|
|
check_implementation(type(self)._is_valid_patterns, BaseMonitor)
|
|
self._is_valid_patterns(patterns)
|
|
check_implementation(type(self)._is_valid_recipes, BaseMonitor)
|
|
self._is_valid_recipes(recipes)
|
|
check_implementation(type(self).add_pattern, BaseMonitor)
|
|
check_implementation(type(self).update_pattern, BaseMonitor)
|
|
check_implementation(type(self).remove_pattern, BaseMonitor)
|
|
check_implementation(type(self).get_patterns, BaseMonitor)
|
|
check_implementation(type(self).add_recipe, BaseMonitor)
|
|
check_implementation(type(self).update_recipe, BaseMonitor)
|
|
check_implementation(type(self).remove_recipe, BaseMonitor)
|
|
check_implementation(type(self).get_recipes, BaseMonitor)
|
|
check_implementation(type(self).get_rules, BaseMonitor)
|
|
self._patterns = patterns
|
|
self._recipes = recipes
|
|
self._rules = create_rules(patterns, recipes)
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if cls is BaseMonitor:
|
|
msg = get_drt_imp_msg(BaseMonitor)
|
|
raise TypeError(msg)
|
|
return object.__new__(cls)
|
|
|
|
def _is_valid_patterns(self, patterns:dict[str,BasePattern])->None:
|
|
pass
|
|
|
|
def _is_valid_recipes(self, recipes:dict[str,BaseRecipe])->None:
|
|
pass
|
|
|
|
def start(self)->None:
|
|
pass
|
|
|
|
def stop(self)->None:
|
|
pass
|
|
|
|
def add_pattern(self, pattern:BasePattern)->None:
|
|
pass
|
|
|
|
def update_pattern(self, pattern:BasePattern)->None:
|
|
pass
|
|
|
|
def remove_pattern(self, pattern:Union[str,BasePattern])->None:
|
|
pass
|
|
|
|
def get_patterns(self)->None:
|
|
pass
|
|
|
|
def add_recipe(self, recipe:BaseRecipe)->None:
|
|
pass
|
|
|
|
def update_recipe(self, recipe:BaseRecipe)->None:
|
|
pass
|
|
|
|
def remove_recipe(self, recipe:Union[str,BaseRecipe])->None:
|
|
pass
|
|
|
|
def get_recipes(self)->None:
|
|
pass
|
|
|
|
def get_rules(self)->None:
|
|
pass
|
|
|
|
|
|
class BaseHandler:
|
|
def __init__(self) -> None:
|
|
check_implementation(type(self).handle, BaseHandler)
|
|
check_implementation(type(self).valid_event_types, BaseHandler)
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if cls is BaseHandler:
|
|
msg = get_drt_imp_msg(BaseHandler)
|
|
raise TypeError(msg)
|
|
return object.__new__(cls)
|
|
|
|
def valid_event_types(self)->list[str]:
|
|
pass
|
|
|
|
def handle(self, event:Any)->None:
|
|
pass
|
|
|
|
def create_rules(patterns:Union[dict[str,BasePattern],list[BasePattern]],
|
|
recipes:Union[dict[str,BaseRecipe],list[BaseRecipe]],
|
|
new_rules:list[BaseRule]=[])->dict[str,BaseRule]:
|
|
check_type(patterns, dict, alt_types=[list])
|
|
check_type(recipes, dict, alt_types=[list])
|
|
valid_list(new_rules, BaseRule, min_length=0)
|
|
|
|
if isinstance(patterns, list):
|
|
valid_list(patterns, BasePattern, min_length=0)
|
|
patterns = {pattern.name:pattern for pattern in patterns}
|
|
else:
|
|
valid_dict(patterns, str, BasePattern, strict=False, min_length=0)
|
|
for k, v in patterns.items():
|
|
if k != v.name:
|
|
raise KeyError(
|
|
f"Key '{k}' indexes unexpected Pattern '{v.name}' "
|
|
"Pattern dictionaries must be keyed with the name of the "
|
|
"Pattern.")
|
|
|
|
if isinstance(recipes, list):
|
|
valid_list(recipes, BaseRecipe, min_length=0)
|
|
recipes = {recipe.name:recipe for recipe in recipes}
|
|
else:
|
|
valid_dict(recipes, str, BaseRecipe, strict=False, min_length=0)
|
|
for k, v in recipes.items():
|
|
if k != v.name:
|
|
raise KeyError(
|
|
f"Key '{k}' indexes unexpected Recipe '{v.name}' "
|
|
"Recipe dictionaries must be keyed with the name of the "
|
|
"Recipe.")
|
|
|
|
generated_rules = {}
|
|
for pattern in patterns.values():
|
|
if pattern.recipe in recipes:
|
|
rule = create_rule(pattern, recipes[pattern.recipe])
|
|
generated_rules[rule.name] = rule
|
|
return generated_rules
|
|
|
|
def create_rule(pattern:BasePattern, recipe:BaseRecipe,
|
|
new_rules:list[BaseRule]=[])->BaseRule:
|
|
check_type(pattern, BasePattern)
|
|
check_type(recipe, BaseRecipe)
|
|
valid_list(new_rules, BaseRule, min_length=0)
|
|
|
|
# Imported here to avoid circular imports at top of file
|
|
import rules
|
|
all_rules ={(r.pattern_type, r.recipe_type):r for r in [r[1] \
|
|
for r in inspect.getmembers(sys.modules["rules"], inspect.isclass) \
|
|
if (issubclass(r[1], BaseRule))]}
|
|
|
|
key = (type(pattern).__name__, type(recipe).__name__)
|
|
if (key) in all_rules:
|
|
return all_rules[key](
|
|
generate_id(prefix="Rule_"),
|
|
pattern,
|
|
recipe
|
|
)
|
|
raise TypeError(f"No valid rule for Pattern '{pattern}' and Recipe "
|
|
f"'{recipe}' could be found.")
|