rewored rules to only invoke base rule, and added bash jobs

This commit is contained in:
PatchOfScotland
2023-03-30 11:33:15 +02:00
parent 747f2c316c
commit 311c98f7f2
20 changed files with 1046 additions and 373 deletions

View File

@ -10,7 +10,7 @@ from typing import Any, Dict, Union, List
from meow_base.core.base_pattern import BasePattern
from meow_base.core.base_recipe import BaseRecipe
from meow_base.core.base_rule import BaseRule
from meow_base.core.rule import Rule
from meow_base.core.correctness.validation import check_type, valid_dict, \
valid_list
from meow_base.core.correctness.vars import EVENT_PATH, EVENT_RULE, \
@ -18,7 +18,7 @@ from meow_base.core.correctness.vars import EVENT_PATH, EVENT_RULE, \
JOB_PATTERN, JOB_RECIPE, JOB_REQUIREMENTS, JOB_RULE, JOB_STATUS, \
JOB_TYPE, STATUS_QUEUED, WATCHDOG_BASE, WATCHDOG_HASH, SWEEP_JUMP, \
SWEEP_START, SWEEP_STOP
from meow_base.functionality.naming import generate_job_id, generate_rule_id
from meow_base.functionality.naming import generate_job_id
# mig trigger keyword replacements
KEYWORD_PATH = "{PATH}"
@ -147,8 +147,7 @@ def create_job(job_type:str, event:Dict[str,Any], extras:Dict[Any,Any]={}
return {**extras, **job_dict}
def create_rules(patterns:Union[Dict[str,BasePattern],List[BasePattern]],
recipes:Union[Dict[str,BaseRecipe],List[BaseRecipe]],
new_rules:List[BaseRule]=[])->Dict[str,BaseRule]:
recipes:Union[Dict[str,BaseRecipe],List[BaseRecipe]])->Dict[str,Rule]:
"""Function to create any valid rules from a given collection of patterns
and recipes. All inbuilt rule types are considered, with additional
definitions provided through the 'new_rules' variable. Note that any
@ -157,7 +156,6 @@ def create_rules(patterns:Union[Dict[str,BasePattern],List[BasePattern]],
# Validation of inputs
check_type(patterns, Dict, alt_types=[List], hint="create_rules.patterns")
check_type(recipes, Dict, alt_types=[List], hint="create_rules.recipes")
valid_list(new_rules, BaseRule, min_length=0)
# Convert a pattern list to a dictionary
if isinstance(patterns, list):
@ -198,34 +196,14 @@ def create_rules(patterns:Union[Dict[str,BasePattern],List[BasePattern]],
pass
return generated_rules
def create_rule(pattern:BasePattern, recipe:BaseRecipe,
new_rules:List[BaseRule]=[])->BaseRule:
def create_rule(pattern:BasePattern, recipe:BaseRecipe)->Rule:
"""Function to create a valid rule from a given pattern and recipe. All
inbuilt rule types are considered, with additional definitions provided
through the 'new_rules' variable."""
check_type(pattern, BasePattern, hint="create_rule.pattern")
check_type(recipe, BaseRecipe, hint="create_rule.recipe")
valid_list(new_rules, BaseRule, min_length=0, hint="create_rule.new_rules")
# TODO fix me
# Imported here to avoid circular imports at top of file
import meow_base.rules
all_rules = {
(r.pattern_type, r.recipe_type):r for r in BaseRule.__subclasses__()
}
# Add in new rules
for rule in new_rules:
all_rules[(rule.pattern_type, rule.recipe_type)] = rule
# Find appropriate rule type from pattern and recipe types
key = (type(pattern).__name__, type(recipe).__name__)
if (key) in all_rules:
return all_rules[key](
generate_rule_id(),
pattern,
recipe
)
# Raise error if not valid rule type can be found
raise TypeError(f"No valid rule for Pattern '{pattern}' and Recipe "
f"'{recipe}' could be found.")
return Rule(
pattern,
recipe
)