diff --git a/a6/src/GenServer.hs b/a6/src/GenServer.hs index e70e18c..748fc00 100644 --- a/a6/src/GenServer.hs +++ b/a6/src/GenServer.hs @@ -1,6 +1,6 @@ module GenServer ( Chan, - Server, + Server(..), receive, send, sendTo, diff --git a/a6/src/SPC.hs b/a6/src/SPC.hs index 0b663ad..1ba317f 100644 --- a/a6/src/SPC.hs +++ b/a6/src/SPC.hs @@ -28,15 +28,12 @@ import Control.Concurrent ( forkIO, killThread, threadDelay, - newChan, - readChan, ThreadId ) import Control.Exception (SomeException, catch) import Control.Monad (ap, forever, liftM, void, filterM, when) import GenServer import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime) -import GHC.RTS.Flags (DebugFlags(scheduler)) -- First some general utility functions. @@ -103,6 +100,8 @@ type WorkerName = String data WorkerMsg = -- | New job time MsgStartJob (IO ()) JobId (ReplyChan ThreadId) + | -- | Remove worker + MsgKill -- Messages sent to SPC. data SPCMsg @@ -124,6 +123,8 @@ data SPCMsg MsgJobDone JobId | -- | Crashed MsgJobCrashed JobId + | -- | Remove worker (workplace accident) + MsgRemoveWorker WorkerName -- | A handle to the SPC instance. data SPC = SPC (Server SPCMsg) @@ -167,12 +168,6 @@ get = SPCM $ \state -> pure (state, state) put :: SPCState -> SPCM () put state = SPCM $ \_ -> pure ((), state) --- | Modify the state. -modify :: (SPCState -> SPCState) -> SPCM () -modify f = do - state <- get - put $ f state - -- | Lift an 'IO' action into 'SPCM'. io :: IO a -> SPCM a io m = SPCM $ \state -> do @@ -295,6 +290,21 @@ handleMsg c = do Just (_, _, _) -> do jobDone jobid DoneCrashed Nothing -> pure () + MsgRemoveWorker workerName -> do + state <- get + case (lookup workerName $ spcWorkers state) of + Just (Worker (Server threadId _)) -> do + jobs <- pure $ map (\(jobid, (name,_,t)) -> (name,(jobid,t))) $ spcJobsRunning state + case (lookup workerName jobs) of + Just (jobid,t) -> do + io $ killThread t + jobDone jobid DoneCancelled + Nothing -> pure () + + state2 <- get + put $ state2 {spcWorkers = removeAssoc workerName $ spcWorkers state2} + io $ killThread threadId + _ -> pure () _ -> pure () startSPC :: IO SPC @@ -367,17 +377,17 @@ workerAdd (SPC c) name = do if exists then pure $ Left "Worker with given name already exist" else do - worker <- workerSpawn c + worker <- workerSpawn name c sendTo c $ MsgAddWorker name worker pure $ Right worker -workerSpawn :: (Server SPCMsg) -> IO Worker -workerSpawn c = do - w <- spawn $ workerLoop c +workerSpawn :: WorkerName -> (Server SPCMsg) -> IO Worker +workerSpawn name c = do + w <- spawn $ workerLoop name c pure $ Worker w -workerLoop :: (Server SPCMsg) -> Chan WorkerMsg -> IO () -workerLoop c m = forever $ do +workerLoop :: WorkerName -> (Server SPCMsg) -> Chan WorkerMsg -> IO () +workerLoop name c m = forever $ do msg <- receive m case msg of -- stuff happening here @@ -391,8 +401,9 @@ workerLoop c m = forever $ do sendTo c $ MsgJobCrashed jobid doJob `catch` onException reply rsvp t + MsgKill -> sendTo c $ MsgRemoveWorker name -- | Shut down a running worker. No effect if the worker is already -- terminated. workerStop :: Worker -> IO () -workerStop = undefined +workerStop (Worker w) = sendTo w MsgKill diff --git a/a6/src/SPC_Tests.hs b/a6/src/SPC_Tests.hs index 696e3f8..1c20cbd 100644 --- a/a6/src/SPC_Tests.hs +++ b/a6/src/SPC_Tests.hs @@ -1,12 +1,10 @@ module SPC_Tests (tests) where import Control.Concurrent (threadDelay) -import Control.Monad (forM, forM_, replicateM) import Data.IORef import SPC -import GenServer import Test.Tasty (TestTree, localOption, mkTimeout, testGroup) -import Test.Tasty.HUnit (assertFailure, testCase, (@?=)) +import Test.Tasty.HUnit (testCase, (@?=)) import Data.Either (isRight) tests :: TestTree @@ -50,7 +48,7 @@ tests = x <- readIORef ref x @?= True, testCase "Running two jobs" $ do - ref <- newIORef 0 + ref <- newIORef (0::Int) spc <- startSPC w <- workerAdd spc "K-2SO" @@ -146,5 +144,23 @@ tests = r2 <- jobWait spc j2 r2 @?= Done x <- readIORef ref - x @?= True + x @?= True, + testCase "Remove worker" $ do + spc <- startSPC + w1 <- workerAdd spc "D-O" + isRight w1 @?= True + case w1 of + (Right worker) -> do + w2 <- workerAdd spc "D-O" -- Can't make another worker with same name yet + isRight w2 @?= False + + j <- jobAdd spc $ Job (threadDelay 1000000) 1 + workerStop worker + threadDelay 100 + r <- jobStatus spc j + r @?= JobDone DoneCancelled + + w3 <- workerAdd spc "D-O" -- But we can make one now + isRight w3 @?= True + _ -> False @?= True ]