added support for directory event matching

This commit is contained in:
PatchOfScotland
2023-03-31 13:51:14 +02:00
parent 14dec78756
commit 5952b02be4
10 changed files with 253 additions and 50 deletions

View File

@ -4,15 +4,18 @@ import os
import unittest
from multiprocessing import Pipe
from time import sleep
from meow_base.core.correctness.vars import FILE_CREATE_EVENT, EVENT_TYPE, \
EVENT_RULE, WATCHDOG_BASE, EVENT_TYPE_WATCHDOG, EVENT_PATH, SWEEP_START, \
SWEEP_JUMP, SWEEP_STOP
SWEEP_JUMP, SWEEP_STOP, DIR_EVENTS
from meow_base.functionality.file_io import make_dir
from meow_base.patterns.file_event_pattern import FileEventPattern, \
WatchdogMonitor, _DEFAULT_MASK
from meow_base.recipes.jupyter_notebook_recipe import JupyterNotebookRecipe
from shared import BAREBONES_NOTEBOOK, TEST_MONITOR_BASE, setup, teardown
from meow_base.recipes.python_recipe import PythonRecipe
from shared import BAREBONES_NOTEBOOK, TEST_MONITOR_BASE, \
COUNTING_PYTHON_SCRIPT, setup, teardown
def patterns_equal(tester, pattern_one, pattern_two):
@ -322,6 +325,77 @@ class WatchdogMonitorTests(unittest.TestCase):
wm.stop()
# Test WatchdogMonitor identifies directory content updates
def testMonitorDirectoryMonitoring(self)->None:
pattern_one = FileEventPattern(
"pattern_one",
os.path.join("top"),
"recipe_one",
"dir_to_count",
parameters={},
event_mask=DIR_EVENTS
)
recipe = PythonRecipe(
"recipe_one", COUNTING_PYTHON_SCRIPT)
patterns = {
pattern_one.name: pattern_one,
}
recipes = {
recipe.name: recipe,
}
wm = WatchdogMonitor(
TEST_MONITOR_BASE,
patterns,
recipes,
settletime=3
)
rules = wm.get_rules()
rule = rules[list(rules.keys())[0]]
from_monitor_reader, from_monitor_writer = Pipe()
wm.to_runner = from_monitor_writer
wm.start()
start_dir = os.path.join(TEST_MONITOR_BASE, "top")
contents = 10
make_dir(start_dir)
for i in range(contents):
with open(os.path.join(start_dir, f"{i}.txt"), "w") as f:
f.write("-")
sleep(1)
self.assertTrue(start_dir)
for i in range(contents):
self.assertTrue(os.path.exists(
os.path.join(start_dir, f"{i}.txt"))
)
messages = []
while True:
if from_monitor_reader.poll(5):
messages.append(from_monitor_reader.recv())
else:
break
self.assertTrue(len(messages), 1)
message = messages[0]
self.assertEqual(type(message), dict)
self.assertIn(EVENT_TYPE, message)
self.assertEqual(message[EVENT_TYPE], EVENT_TYPE_WATCHDOG)
self.assertIn(WATCHDOG_BASE, message)
self.assertEqual(message[WATCHDOG_BASE], TEST_MONITOR_BASE)
self.assertIn(EVENT_PATH, message)
self.assertEqual(message[EVENT_PATH], start_dir)
self.assertIn(EVENT_RULE, message)
self.assertEqual(message[EVENT_RULE].name, rule.name)
wm.stop()
# Test WatchdogMonitor identifies fake events for retroactive patterns
def testMonitoringRetroActive(self)->None:
pattern_one = FileEventPattern(
@ -389,6 +463,76 @@ class WatchdogMonitorTests(unittest.TestCase):
wm.stop()
# Test WatchdogMonitor identifies events for retroacive directory patterns
def testMonitorRetroActiveDirectory(self)->None:
contents = 10
start_dir = os.path.join(TEST_MONITOR_BASE, "top")
make_dir(start_dir)
for i in range(contents):
with open(os.path.join(start_dir, f"{i}.txt"), "w") as f:
f.write("-")
sleep(1)
self.assertTrue(start_dir)
for i in range(contents):
self.assertTrue(os.path.exists(
os.path.join(start_dir, f"{i}.txt"))
)
pattern_one = FileEventPattern(
"pattern_one",
os.path.join("top"),
"recipe_one",
"dir_to_count",
parameters={},
event_mask=DIR_EVENTS
)
recipe = PythonRecipe(
"recipe_one", COUNTING_PYTHON_SCRIPT)
patterns = {
pattern_one.name: pattern_one,
}
recipes = {
recipe.name: recipe,
}
wm = WatchdogMonitor(
TEST_MONITOR_BASE,
patterns,
recipes,
settletime=3
)
rules = wm.get_rules()
rule = rules[list(rules.keys())[0]]
from_monitor_reader, from_monitor_writer = Pipe()
wm.to_runner = from_monitor_writer
wm.start()
messages = []
while True:
if from_monitor_reader.poll(5):
messages.append(from_monitor_reader.recv())
else:
break
self.assertTrue(len(messages), 1)
message = messages[0]
self.assertEqual(type(message), dict)
self.assertIn(EVENT_TYPE, message)
self.assertEqual(message[EVENT_TYPE], EVENT_TYPE_WATCHDOG)
self.assertIn(WATCHDOG_BASE, message)
self.assertEqual(message[WATCHDOG_BASE], TEST_MONITOR_BASE)
self.assertIn(EVENT_PATH, message)
self.assertEqual(message[EVENT_PATH], start_dir)
self.assertIn(EVENT_RULE, message)
self.assertEqual(message[EVENT_RULE].name, rule.name)
wm.stop()
# Test WatchdogMonitor get_patterns function
def testMonitorGetPatterns(self)->None:
pattern_one = FileEventPattern(
@ -790,4 +934,3 @@ class WatchdogMonitorTests(unittest.TestCase):
self.assertIsInstance(rules, dict)
self.assertEqual(len(rules), 2)