🔪 Killing workers
This commit is contained in:
@ -1,6 +1,6 @@
|
|||||||
module GenServer
|
module GenServer
|
||||||
( Chan,
|
( Chan,
|
||||||
Server,
|
Server(..),
|
||||||
receive,
|
receive,
|
||||||
send,
|
send,
|
||||||
sendTo,
|
sendTo,
|
||||||
|
@ -28,15 +28,12 @@ import Control.Concurrent
|
|||||||
( forkIO,
|
( forkIO,
|
||||||
killThread,
|
killThread,
|
||||||
threadDelay,
|
threadDelay,
|
||||||
newChan,
|
|
||||||
readChan,
|
|
||||||
ThreadId
|
ThreadId
|
||||||
)
|
)
|
||||||
import Control.Exception (SomeException, catch)
|
import Control.Exception (SomeException, catch)
|
||||||
import Control.Monad (ap, forever, liftM, void, filterM, when)
|
import Control.Monad (ap, forever, liftM, void, filterM, when)
|
||||||
import GenServer
|
import GenServer
|
||||||
import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime)
|
import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime)
|
||||||
import GHC.RTS.Flags (DebugFlags(scheduler))
|
|
||||||
|
|
||||||
-- First some general utility functions.
|
-- First some general utility functions.
|
||||||
|
|
||||||
@ -103,6 +100,8 @@ type WorkerName = String
|
|||||||
data WorkerMsg
|
data WorkerMsg
|
||||||
= -- | New job time
|
= -- | New job time
|
||||||
MsgStartJob (IO ()) JobId (ReplyChan ThreadId)
|
MsgStartJob (IO ()) JobId (ReplyChan ThreadId)
|
||||||
|
| -- | Remove worker
|
||||||
|
MsgKill
|
||||||
|
|
||||||
-- Messages sent to SPC.
|
-- Messages sent to SPC.
|
||||||
data SPCMsg
|
data SPCMsg
|
||||||
@ -124,6 +123,8 @@ data SPCMsg
|
|||||||
MsgJobDone JobId
|
MsgJobDone JobId
|
||||||
| -- | Crashed
|
| -- | Crashed
|
||||||
MsgJobCrashed JobId
|
MsgJobCrashed JobId
|
||||||
|
| -- | Remove worker (workplace accident)
|
||||||
|
MsgRemoveWorker WorkerName
|
||||||
|
|
||||||
-- | A handle to the SPC instance.
|
-- | A handle to the SPC instance.
|
||||||
data SPC = SPC (Server SPCMsg)
|
data SPC = SPC (Server SPCMsg)
|
||||||
@ -167,12 +168,6 @@ get = SPCM $ \state -> pure (state, state)
|
|||||||
put :: SPCState -> SPCM ()
|
put :: SPCState -> SPCM ()
|
||||||
put state = SPCM $ \_ -> pure ((), state)
|
put state = SPCM $ \_ -> pure ((), state)
|
||||||
|
|
||||||
-- | Modify the state.
|
|
||||||
modify :: (SPCState -> SPCState) -> SPCM ()
|
|
||||||
modify f = do
|
|
||||||
state <- get
|
|
||||||
put $ f state
|
|
||||||
|
|
||||||
-- | Lift an 'IO' action into 'SPCM'.
|
-- | Lift an 'IO' action into 'SPCM'.
|
||||||
io :: IO a -> SPCM a
|
io :: IO a -> SPCM a
|
||||||
io m = SPCM $ \state -> do
|
io m = SPCM $ \state -> do
|
||||||
@ -295,6 +290,21 @@ handleMsg c = do
|
|||||||
Just (_, _, _) -> do
|
Just (_, _, _) -> do
|
||||||
jobDone jobid DoneCrashed
|
jobDone jobid DoneCrashed
|
||||||
Nothing -> pure ()
|
Nothing -> pure ()
|
||||||
|
MsgRemoveWorker workerName -> do
|
||||||
|
state <- get
|
||||||
|
case (lookup workerName $ spcWorkers state) of
|
||||||
|
Just (Worker (Server threadId _)) -> do
|
||||||
|
jobs <- pure $ map (\(jobid, (name,_,t)) -> (name,(jobid,t))) $ spcJobsRunning state
|
||||||
|
case (lookup workerName jobs) of
|
||||||
|
Just (jobid,t) -> do
|
||||||
|
io $ killThread t
|
||||||
|
jobDone jobid DoneCancelled
|
||||||
|
Nothing -> pure ()
|
||||||
|
|
||||||
|
state2 <- get
|
||||||
|
put $ state2 {spcWorkers = removeAssoc workerName $ spcWorkers state2}
|
||||||
|
io $ killThread threadId
|
||||||
|
_ -> pure ()
|
||||||
_ -> pure ()
|
_ -> pure ()
|
||||||
|
|
||||||
startSPC :: IO SPC
|
startSPC :: IO SPC
|
||||||
@ -367,17 +377,17 @@ workerAdd (SPC c) name = do
|
|||||||
if exists
|
if exists
|
||||||
then pure $ Left "Worker with given name already exist"
|
then pure $ Left "Worker with given name already exist"
|
||||||
else do
|
else do
|
||||||
worker <- workerSpawn c
|
worker <- workerSpawn name c
|
||||||
sendTo c $ MsgAddWorker name worker
|
sendTo c $ MsgAddWorker name worker
|
||||||
pure $ Right worker
|
pure $ Right worker
|
||||||
|
|
||||||
workerSpawn :: (Server SPCMsg) -> IO Worker
|
workerSpawn :: WorkerName -> (Server SPCMsg) -> IO Worker
|
||||||
workerSpawn c = do
|
workerSpawn name c = do
|
||||||
w <- spawn $ workerLoop c
|
w <- spawn $ workerLoop name c
|
||||||
pure $ Worker w
|
pure $ Worker w
|
||||||
|
|
||||||
workerLoop :: (Server SPCMsg) -> Chan WorkerMsg -> IO ()
|
workerLoop :: WorkerName -> (Server SPCMsg) -> Chan WorkerMsg -> IO ()
|
||||||
workerLoop c m = forever $ do
|
workerLoop name c m = forever $ do
|
||||||
msg <- receive m
|
msg <- receive m
|
||||||
case msg of
|
case msg of
|
||||||
-- stuff happening here
|
-- stuff happening here
|
||||||
@ -391,8 +401,9 @@ workerLoop c m = forever $ do
|
|||||||
sendTo c $ MsgJobCrashed jobid
|
sendTo c $ MsgJobCrashed jobid
|
||||||
doJob `catch` onException
|
doJob `catch` onException
|
||||||
reply rsvp t
|
reply rsvp t
|
||||||
|
MsgKill -> sendTo c $ MsgRemoveWorker name
|
||||||
|
|
||||||
-- | Shut down a running worker. No effect if the worker is already
|
-- | Shut down a running worker. No effect if the worker is already
|
||||||
-- terminated.
|
-- terminated.
|
||||||
workerStop :: Worker -> IO ()
|
workerStop :: Worker -> IO ()
|
||||||
workerStop = undefined
|
workerStop (Worker w) = sendTo w MsgKill
|
||||||
|
@ -1,12 +1,10 @@
|
|||||||
module SPC_Tests (tests) where
|
module SPC_Tests (tests) where
|
||||||
|
|
||||||
import Control.Concurrent (threadDelay)
|
import Control.Concurrent (threadDelay)
|
||||||
import Control.Monad (forM, forM_, replicateM)
|
|
||||||
import Data.IORef
|
import Data.IORef
|
||||||
import SPC
|
import SPC
|
||||||
import GenServer
|
|
||||||
import Test.Tasty (TestTree, localOption, mkTimeout, testGroup)
|
import Test.Tasty (TestTree, localOption, mkTimeout, testGroup)
|
||||||
import Test.Tasty.HUnit (assertFailure, testCase, (@?=))
|
import Test.Tasty.HUnit (testCase, (@?=))
|
||||||
import Data.Either (isRight)
|
import Data.Either (isRight)
|
||||||
|
|
||||||
tests :: TestTree
|
tests :: TestTree
|
||||||
@ -50,7 +48,7 @@ tests =
|
|||||||
x <- readIORef ref
|
x <- readIORef ref
|
||||||
x @?= True,
|
x @?= True,
|
||||||
testCase "Running two jobs" $ do
|
testCase "Running two jobs" $ do
|
||||||
ref <- newIORef 0
|
ref <- newIORef (0::Int)
|
||||||
spc <- startSPC
|
spc <- startSPC
|
||||||
|
|
||||||
w <- workerAdd spc "K-2SO"
|
w <- workerAdd spc "K-2SO"
|
||||||
@ -146,5 +144,23 @@ tests =
|
|||||||
r2 <- jobWait spc j2
|
r2 <- jobWait spc j2
|
||||||
r2 @?= Done
|
r2 @?= Done
|
||||||
x <- readIORef ref
|
x <- readIORef ref
|
||||||
x @?= True
|
x @?= True,
|
||||||
|
testCase "Remove worker" $ do
|
||||||
|
spc <- startSPC
|
||||||
|
w1 <- workerAdd spc "D-O"
|
||||||
|
isRight w1 @?= True
|
||||||
|
case w1 of
|
||||||
|
(Right worker) -> do
|
||||||
|
w2 <- workerAdd spc "D-O" -- Can't make another worker with same name yet
|
||||||
|
isRight w2 @?= False
|
||||||
|
|
||||||
|
j <- jobAdd spc $ Job (threadDelay 1000000) 1
|
||||||
|
workerStop worker
|
||||||
|
threadDelay 100
|
||||||
|
r <- jobStatus spc j
|
||||||
|
r @?= JobDone DoneCancelled
|
||||||
|
|
||||||
|
w3 <- workerAdd spc "D-O" -- But we can make one now
|
||||||
|
isRight w3 @?= True
|
||||||
|
_ -> False @?= True
|
||||||
]
|
]
|
||||||
|
Reference in New Issue
Block a user