added watchdog file monitoring

This commit is contained in:
PatchOfScotland
2022-12-13 14:59:43 +01:00
parent 4041b86343
commit 380f7066e1
13 changed files with 550 additions and 65 deletions

View File

@ -1,15 +1,28 @@
import shutil
import os
import unittest
from patterns.file_event_pattern import FileEventPattern
from multiprocessing import Pipe
from core.correctness.vars import FILE_EVENTS, FILE_CREATE_EVENT, PIPE_READ, \
PIPE_WRITE, BAREBONES_NOTEBOOK
from core.functionality import create_rules
from patterns.file_event_pattern import FileEventPattern, WatchdogMonitor
from recipes import JupyterNotebookRecipe
TEST_BASE = "test_base"
class CorrectnessTests(unittest.TestCase):
def setUp(self) -> None:
return super().setUp()
super().setUp()
if not os.path.exists(TEST_BASE):
os.mkdir(TEST_BASE)
def tearDown(self) -> None:
return super().tearDown()
super().tearDown()
if os.path.exists(TEST_BASE):
shutil.rmtree(TEST_BASE)
def testFileEventPatternCreationMinimum(self)->None:
FileEventPattern("name", "path", "recipe", "file")
@ -79,3 +92,68 @@ class CorrectnessTests(unittest.TestCase):
fep = FileEventPattern(
"name", "path", "recipe", "file", outputs=outputs)
self.assertEqual(fep.outputs, outputs)
def testFileEventPatternEventMask(self)->None:
fep = FileEventPattern("name", "path", "recipe", "file")
self.assertEqual(fep.event_mask, FILE_EVENTS)
with self.assertRaises(TypeError):
fep = FileEventPattern("name", "path", "recipe", "file",
event_mask=FILE_CREATE_EVENT)
with self.assertRaises(ValueError):
fep = FileEventPattern("name", "path", "recipe", "file",
event_mask=["nope"])
with self.assertRaises(ValueError):
fep = FileEventPattern("name", "path", "recipe", "file",
event_mask=[FILE_CREATE_EVENT, "nope"])
self.assertEqual(fep.event_mask, FILE_EVENTS)
def testWatchdogMonitorMinimum(self)->None:
to_monitor = Pipe()
from_monitor = Pipe()
WatchdogMonitor(TEST_BASE, {}, from_monitor[PIPE_WRITE],
to_monitor[PIPE_READ])
def testWatchdogMonitorEventIdentificaion(self)->None:
to_monitor = Pipe()
from_monitor = Pipe()
pattern_one = FileEventPattern(
"pattern_one", "A", "recipe_one", "file_one")
recipe = JupyterNotebookRecipe(
"recipe_one", BAREBONES_NOTEBOOK)
patterns = {
pattern_one.name: pattern_one,
}
recipes = {
recipe.name: recipe,
}
rules = create_rules(patterns, recipes)
wm = WatchdogMonitor(TEST_BASE, rules, from_monitor[PIPE_WRITE],
to_monitor[PIPE_READ])
wm.start()
open(os.path.join(TEST_BASE, "A"), "w")
if from_monitor[PIPE_READ].poll(3):
message = from_monitor[PIPE_READ].recv()
self.assertIsNotNone(message)
event, rule = message
self.assertIsNotNone(event)
self.assertIsNotNone(rule)
self.assertEqual(event.src_path, os.path.join(TEST_BASE, "A"))
open(os.path.join(TEST_BASE, "B"), "w")
if from_monitor[PIPE_READ].poll(3):
new_message = from_monitor[PIPE_READ].recv()
else:
new_message = None
self.assertIsNone(new_message)
wm.stop()