Files
meow_base/core/meow.py

57 lines
1.6 KiB
Python

"""
This file contains certain meow specific defintions, most notably the
dictionaries passed around within the runner.
Author(s): David Marchant
"""
from datetime import datetime
from typing import Any, Dict, Type
from meow_base.core.rule import Rule
from meow_base.functionality.validation import check_type
from meow_base.core.vars import EVENT_TYPE, EVENT_PATH, \
JOB_EVENT, JOB_TYPE, JOB_ID, JOB_PATTERN, JOB_RECIPE, JOB_RULE, \
JOB_STATUS, JOB_CREATE_TIME, EVENT_RULE, EVENT_TIME
# Required keys in event dict
EVENT_KEYS = {
EVENT_TYPE: str,
EVENT_PATH: str,
EVENT_TIME: float,
EVENT_RULE: Rule
}
# Required keys in job dict
JOB_KEYS = {
JOB_TYPE: str,
JOB_EVENT: Dict,
JOB_ID: str,
JOB_PATTERN: Any,
JOB_RECIPE: Any,
JOB_RULE: str,
JOB_STATUS: str,
JOB_CREATE_TIME: datetime,
}
def valid_meow_dict(meow_dict:Dict[str,Any], msg:str,
keys:Dict[str,Type])->None:
"""Check given dictionary expresses a meow construct. This won't do much
directly, but is called by more specific validation functions."""
check_type(meow_dict, Dict)
# Check we have all the required keys, and they are all of the expected
# type
for key, value_type in keys.items():
if not key in meow_dict.keys():
raise KeyError(f"{msg} require key '{key}'")
check_type(meow_dict[key], value_type)
def valid_event(event:Dict[str,Any])->None:
"""Check that a given dict expresses a meow event."""
valid_meow_dict(event, "Event", EVENT_KEYS)
def valid_job(job:Dict[str,Any])->None:
"""Check that a given dict expresses a meow job."""
valid_meow_dict(job, "Job", JOB_KEYS)