also refactored core.meow into seperate files in hope that it'll help solve circular imports
This commit is contained in:
@ -6,13 +6,17 @@ Author(s): David Marchant
|
||||
|
||||
from datetime import datetime
|
||||
from os.path import basename, dirname, relpath, splitext
|
||||
from typing import Any, Dict
|
||||
from typing import Any, Dict, Union, List
|
||||
|
||||
from core.base_pattern import BasePattern
|
||||
from core.base_recipe import BaseRecipe
|
||||
from core.base_rule import BaseRule
|
||||
from core.correctness.validation import check_type, valid_dict, valid_list
|
||||
from core.correctness.vars import EVENT_PATH, EVENT_RULE, EVENT_TYPE, \
|
||||
EVENT_TYPE_WATCHDOG, JOB_CREATE_TIME, JOB_EVENT, JOB_ID, JOB_PATTERN, \
|
||||
JOB_RECIPE, JOB_REQUIREMENTS, JOB_RULE, JOB_STATUS, JOB_TYPE, \
|
||||
STATUS_QUEUED, WATCHDOG_BASE, WATCHDOG_HASH
|
||||
from functionality.naming import generate_job_id
|
||||
from functionality.naming import generate_job_id, generate_rule_id
|
||||
|
||||
|
||||
# mig trigger keyword replacements
|
||||
@ -101,3 +105,84 @@ def create_job(job_type:str, event:Dict[str,Any], extras:Dict[Any,Any]={}
|
||||
|
||||
return {**extras, **job_dict}
|
||||
|
||||
def create_rules(patterns:Union[Dict[str,BasePattern],List[BasePattern]],
|
||||
recipes:Union[Dict[str,BaseRecipe],List[BaseRecipe]],
|
||||
new_rules:List[BaseRule]=[])->Dict[str,BaseRule]:
|
||||
"""Function to create any valid rules from a given collection of patterns
|
||||
and recipes. All inbuilt rule types are considered, with additional
|
||||
definitions provided through the 'new_rules' variable. Note that any
|
||||
provided pattern and recipe dictionaries must be keyed with the
|
||||
corresponding pattern and recipe names."""
|
||||
# Validation of inputs
|
||||
check_type(patterns, Dict, alt_types=[List], hint="create_rules.patterns")
|
||||
check_type(recipes, Dict, alt_types=[List], hint="create_rules.recipes")
|
||||
valid_list(new_rules, BaseRule, min_length=0)
|
||||
|
||||
# Convert a pattern list to a dictionary
|
||||
if isinstance(patterns, list):
|
||||
valid_list(patterns, BasePattern, min_length=0)
|
||||
patterns = {pattern.name:pattern for pattern in patterns}
|
||||
else:
|
||||
# Validate the pattern dictionary
|
||||
valid_dict(patterns, str, BasePattern, strict=False, min_length=0)
|
||||
for k, v in patterns.items():
|
||||
if k != v.name:
|
||||
raise KeyError(
|
||||
f"Key '{k}' indexes unexpected Pattern '{v.name}' "
|
||||
"Pattern dictionaries must be keyed with the name of the "
|
||||
"Pattern.")
|
||||
|
||||
# Convert a recipe list into a dictionary
|
||||
if isinstance(recipes, list):
|
||||
valid_list(recipes, BaseRecipe, min_length=0)
|
||||
recipes = {recipe.name:recipe for recipe in recipes}
|
||||
else:
|
||||
# Validate the recipe dictionary
|
||||
valid_dict(recipes, str, BaseRecipe, strict=False, min_length=0)
|
||||
for k, v in recipes.items():
|
||||
if k != v.name:
|
||||
raise KeyError(
|
||||
f"Key '{k}' indexes unexpected Recipe '{v.name}' "
|
||||
"Recipe dictionaries must be keyed with the name of the "
|
||||
"Recipe.")
|
||||
|
||||
# Try to create a rule for each rule in turn
|
||||
generated_rules = {}
|
||||
for pattern in patterns.values():
|
||||
if pattern.recipe in recipes:
|
||||
try:
|
||||
rule = create_rule(pattern, recipes[pattern.recipe])
|
||||
generated_rules[rule.name] = rule
|
||||
except TypeError:
|
||||
pass
|
||||
return generated_rules
|
||||
|
||||
def create_rule(pattern:BasePattern, recipe:BaseRecipe,
|
||||
new_rules:List[BaseRule]=[])->BaseRule:
|
||||
"""Function to create a valid rule from a given pattern and recipe. All
|
||||
inbuilt rule types are considered, with additional definitions provided
|
||||
through the 'new_rules' variable."""
|
||||
check_type(pattern, BasePattern, hint="create_rule.pattern")
|
||||
check_type(recipe, BaseRecipe, hint="create_rule.recipe")
|
||||
valid_list(new_rules, BaseRule, min_length=0, hint="create_rule.new_rules")
|
||||
|
||||
# TODO fix me
|
||||
# Imported here to avoid circular imports at top of file
|
||||
import rules
|
||||
all_rules = {(r.pattern_type, r.recipe_type):r for r in BaseRule.__subclasses__()}
|
||||
|
||||
# Add in new rules
|
||||
for rule in new_rules:
|
||||
all_rules[(rule.pattern_type, rule.recipe_type)] = rule
|
||||
|
||||
# Find appropriate rule type from pattern and recipe types
|
||||
key = (type(pattern).__name__, type(recipe).__name__)
|
||||
if (key) in all_rules:
|
||||
return all_rules[key](
|
||||
generate_rule_id(),
|
||||
pattern,
|
||||
recipe
|
||||
)
|
||||
# Raise error if not valid rule type can be found
|
||||
raise TypeError(f"No valid rule for Pattern '{pattern}' and Recipe "
|
||||
f"'{recipe}' could be found.")
|
||||
|
Reference in New Issue
Block a user