refactored correctness dir away from core sub dir into either functionality or core as appropriate
This commit is contained in:
64
core/meow.py
Normal file
64
core/meow.py
Normal file
@ -0,0 +1,64 @@
|
||||
|
||||
"""
|
||||
This file contains certain meow specific defintions, most notably the
|
||||
dictionaries passed around within the runner.
|
||||
|
||||
Author(s): David Marchant
|
||||
"""
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Type
|
||||
|
||||
from meow_base.core.rule import Rule
|
||||
from meow_base.functionality.validation import check_type
|
||||
from meow_base.core.vars import EVENT_TYPE, EVENT_PATH, \
|
||||
JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, JOB_RECIPE, JOB_RULE, \
|
||||
JOB_STATUS, JOB_CREATE_TIME, EVENT_RULE, WATCHDOG_BASE, WATCHDOG_HASH
|
||||
|
||||
# Required keys in event dict
|
||||
EVENT_KEYS = {
|
||||
EVENT_TYPE: str,
|
||||
EVENT_PATH: str,
|
||||
EVENT_RULE: Rule
|
||||
}
|
||||
|
||||
WATCHDOG_EVENT_KEYS = {
|
||||
WATCHDOG_BASE: str,
|
||||
WATCHDOG_HASH: str,
|
||||
**EVENT_KEYS
|
||||
}
|
||||
|
||||
# Required keys in job dict
|
||||
JOB_KEYS = {
|
||||
JOB_TYPE: str,
|
||||
JOB_EVENT: Dict,
|
||||
JOB_ID: str,
|
||||
JOB_PATTERN: Any,
|
||||
JOB_RECIPE: Any,
|
||||
JOB_RULE: str,
|
||||
JOB_STATUS: str,
|
||||
JOB_CREATE_TIME: datetime,
|
||||
}
|
||||
|
||||
|
||||
def valid_meow_dict(meow_dict:Dict[str,Any], msg:str,
|
||||
keys:Dict[str,Type])->None:
|
||||
"""Check given dictionary expresses a meow construct. This won't do much
|
||||
directly, but is called by more specific validation functions."""
|
||||
check_type(meow_dict, Dict)
|
||||
# Check we have all the required keys, and they are all of the expected
|
||||
# type
|
||||
for key, value_type in keys.items():
|
||||
if not key in meow_dict.keys():
|
||||
raise KeyError(f"{msg} require key '{key}'")
|
||||
check_type(meow_dict[key], value_type)
|
||||
|
||||
def valid_event(event:Dict[str,Any])->None:
|
||||
"""Check that a given dict expresses a meow event."""
|
||||
valid_meow_dict(event, "Event", EVENT_KEYS)
|
||||
|
||||
def valid_job(job:Dict[str,Any])->None:
|
||||
"""Check that a given dict expresses a meow job."""
|
||||
valid_meow_dict(job, "Job", JOB_KEYS)
|
||||
|
||||
def valid_watchdog_event(event:Dict[str,Any])->None:
|
||||
valid_meow_dict(event, "Watchdog event", WATCHDOG_EVENT_KEYS)
|
Reference in New Issue
Block a user