diff --git a/a6/src/SPC.hs b/a6/src/SPC.hs index ed1dfd2..95a5b0e 100644 --- a/a6/src/SPC.hs +++ b/a6/src/SPC.hs @@ -18,6 +18,9 @@ module SPC WorkerName, workerAdd, workerStop, + + -- debugState, + -- SPCState (..) ) where @@ -26,9 +29,10 @@ import Control.Concurrent killThread, threadDelay, newChan, - readChan + readChan, + ThreadId ) -import Control.Monad (ap, forever, liftM, void, filterM) +import Control.Monad (ap, forever, liftM, void, filterM, when) import GenServer import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime) import GHC.RTS.Flags (DebugFlags(scheduler)) @@ -97,7 +101,7 @@ type WorkerName = String -- processes spawned by the workes. data WorkerMsg = -- | New job time - MsgStartJob (IO ()) JobId + MsgStartJob (IO ()) JobId (ReplyChan ThreadId) -- Messages sent to SPC. data SPCMsg @@ -117,6 +121,7 @@ data SPCMsg MsgAddWorker WorkerName Worker | -- | Worker finished job MsgJobDone JobId + -- | MsgDebug (ReplyChan SPCState) -- | A handle to the SPC instance. data SPC = SPC (Server SPCMsg) @@ -127,7 +132,7 @@ data Worker = Worker (Server WorkerMsg) -- | The central state. Must be protected from the bourgeoisie. data SPCState = SPCState { spcJobsPending :: [(JobId, Job)], - spcJobsRunning :: [(JobId, (Job, WorkerName))], + spcJobsRunning :: [(JobId, (WorkerName, Seconds, ThreadId))], spcJobsDone :: [(JobId, JobDoneReason)], spcJobCounter :: JobId, spcWorkers :: [(WorkerName, Worker)], @@ -179,13 +184,19 @@ runSPCM state (SPCM f) = fst <$> f state workerIsIdle :: (WorkerName, Worker) -> SPCM Bool workerIsIdle (name, _) = do state <- get - pure (all (\(_, (_,w)) -> w /= name) (spcJobsRunning state)) + pure (all (\(_, (w,_,_)) -> w /= name) (spcJobsRunning state)) -workerIsGone :: WorkerName -> SPCM () -workerIsGone = undefined +checkJobTimeout :: (JobId, (WorkerName, Seconds, ThreadId)) -> SPCM () +checkJobTimeout (jobid, (_, deadline, t)) = do + now <- io $ getSeconds + when (now >= deadline) $ do + io $ killThread t + jobDone jobid DoneTimeout checkTimeouts :: SPCM () -checkTimeouts = pure () -- change in Task 4 +checkTimeouts = do + state <- get + mapM_ checkJobTimeout (spcJobsRunning state) getIdleWorkers :: SPCM [(WorkerName, Worker)] getIdleWorkers = do @@ -201,10 +212,12 @@ schedule = do case workers of (workerName,worker):_ -> do w <- (\(Worker w) -> pure w) worker - io $ sendTo w (MsgStartJob (jobAction job) jobid) + threadId <- io $ requestReply w (MsgStartJob (jobAction job) jobid) + now <- io $ getSeconds + let deadline = now + fromIntegral (jobMaxSeconds job) put $ state - { spcJobsRunning = (jobid, (job, workerName)) : spcJobsRunning state, + { spcJobsRunning = (jobid, (workerName, deadline, threadId)) : spcJobsRunning state, spcJobsPending = jobs } _ -> pure () @@ -251,7 +264,7 @@ handleMsg c = do MsgJobDone jobid -> do state <- get case (lookup jobid $ spcJobsRunning state) of - Just (job, _) -> do + Just (_, _, _) -> do jobDone jobid Done Nothing -> pure () MsgJobWait jobid rsvp -> do @@ -261,14 +274,21 @@ handleMsg c = do io $ reply rsvp $ reason Nothing -> put $ state {spcWaiting = (jobid, rsvp) : spcWaiting state} + MsgJobCancel jobid -> do + state <- get + case (lookup jobid $ spcJobsRunning state, lookup jobid $ spcJobsPending state) of + (Just (_,_,t), _) -> do + io $ killThread t + jobDone jobid DoneCancelled + (_, Just _) -> do + put $ + state + { spcJobsPending = removeAssoc jobid $ spcJobsPending state, + spcJobsDone = (jobid, DoneCancelled) : spcJobsDone state + } + _ -> pure () _ -> pure () -deleteJob :: JobId -> [(JobId, (Job, WorkerName))] -> [(JobId, (Job, WorkerName))] -deleteJob jobid list = - case list of - [] -> [] - (jid, (job, w)):l -> if (jid == jobid) then l else (jid,(job,w)):(deleteJob jobid l) - startSPC :: IO SPC startSPC = do let initial_state = @@ -302,7 +322,7 @@ jobDone jobid reason = do put $ state { spcJobsRunning = - deleteJob jobid (spcJobsRunning state), + removeAssoc jobid $ spcJobsRunning state, spcJobsDone = (jobid, reason) : spcJobsDone state } @@ -327,6 +347,10 @@ jobCancel :: SPC -> JobId -> IO () jobCancel (SPC c) jobid = sendTo c $ MsgJobCancel jobid +-- debugState :: SPC -> IO SPCState +-- debugState (SPC c) = +-- requestReply c $ MsgDebug + -- | Add a new worker with this name. Fails with 'Left' if a worker -- with that name already exists. workerAdd :: SPC -> WorkerName -> IO (Either String Worker) @@ -335,23 +359,25 @@ workerAdd (SPC c) name = do if exists then pure $ Left "Worker with given name already exist" else do - worker <- workerSpawn name c + worker <- workerSpawn c sendTo c $ MsgAddWorker name worker pure $ Right worker -workerSpawn :: WorkerName -> (Server SPCMsg) -> IO Worker -workerSpawn name c = do - w <- spawn $ workerLoop name c +workerSpawn :: (Server SPCMsg) -> IO Worker +workerSpawn c = do + w <- spawn $ workerLoop c pure $ Worker w -workerLoop :: WorkerName -> (Server SPCMsg) -> Chan WorkerMsg -> IO () -workerLoop name c m = do +workerLoop :: (Server SPCMsg) -> Chan WorkerMsg -> IO () +workerLoop c m = forever $ do msg <- receive m case msg of -- stuff happening here - MsgStartJob action jobid -> do - action - sendTo c $ MsgJobDone jobid + MsgStartJob action jobid rsvp -> do + t <- forkIO $ do + action + sendTo c $ MsgJobDone jobid + reply rsvp t -- | Shut down a running worker. No effect if the worker is already -- terminated. diff --git a/a6/src/SPC_Tests.hs b/a6/src/SPC_Tests.hs index 8b94779..a3d9e0c 100644 --- a/a6/src/SPC_Tests.hs +++ b/a6/src/SPC_Tests.hs @@ -17,36 +17,118 @@ tests = [ testCase "workerAdd" $ do spc <- startSPC + w <- workerAdd spc "R2-D2" isRight w @?= True, testCase "workerAdd (2)" $ do spc <- startSPC + w1 <- workerAdd spc "MSE-6" isRight w1 @?= True + w2 <- workerAdd spc "GNK" isRight w2 @?= True, - testCase "workerAdd (3)" $ do - spc <- startSPC - w1 <- workerAdd spc "C-3PO" - isRight w1 @?= True - w2 <- workerAdd spc "K-2SO" - isRight w2 @?= True - w3 <- workerAdd spc "IG-88" - isRight w3 @?= True, testCase "workerAdd (fail)" $ do spc <- startSPC + w1 <- workerAdd spc "BD-1" isRight w1 @?= True + w2 <- workerAdd spc "BD-1" isRight w2 @?= False, testCase "Running a job" $ do ref <- newIORef False spc <- startSPC + w <- workerAdd spc "R5-D4" isRight w @?= True + j <- jobAdd spc $ Job (writeIORef ref True) 1 r <- jobWait spc j r @?= Done + + x <- readIORef ref + x @?= True, + testCase "Running two jobs" $ do + ref <- newIORef 0 + spc <- startSPC + + w <- workerAdd spc "K-2SO" + isRight w @?= True + + j1 <- jobAdd spc $ Job (writeIORef ref 1) 1 + r1 <- jobWait spc j1 + r1 @?= Done + + x1 <- readIORef ref + x1 @?= 1 + + j2 <- jobAdd spc $ Job (writeIORef ref 2) 1 + r2 <- jobWait spc j2 + r2 @?= Done + + x2 <- readIORef ref + x2 @?= 2, + testCase "Canceling job (pending)" $ do + spc <- startSPC + j <- jobAdd spc $ Job (pure ()) 1 + jobCancel spc j + r <- jobStatus spc j + r @?= JobDone DoneCancelled, + testCase "Canceling job (running)" $ do + spc <- startSPC + + w <- workerAdd spc "IG-88" + isRight w @?= True + + j <- jobAdd spc $ Job (threadDelay 2000000) 2 + jobCancel spc j + r <- jobStatus spc j + r @?= JobDone DoneCancelled, + testCase "Canceling job (running) (new job)" $ do + ref <- newIORef False + spc <- startSPC + + w <- workerAdd spc "C-3PO" + isRight w @?= True + + j1 <- jobAdd spc $ Job (threadDelay 2000000) 2 + jobCancel spc j1 + r1 <- jobStatus spc j1 + r1 @?= JobDone DoneCancelled + + -- job has been cancelled. Starting new job + j2 <- jobAdd spc $ Job (writeIORef ref True) 1 + r2 <- jobWait spc j2 + r2 @?= Done + + x <- readIORef ref + x @?= True, + testCase "Timeout" $ do + spc <- startSPC + + w <- workerAdd spc "L3-37" + isRight w @?= True + + j <- jobAdd spc $ Job (threadDelay 2000000) 1 + r <- jobWait spc j + + r @?= DoneTimeout, + testCase "Timeout (2 jobs)" $ do + ref <- newIORef False + spc <- startSPC + + w <- workerAdd spc "General Kalani" + isRight w @?= True + + j1 <- jobAdd spc $ Job (threadDelay 2000000) 1 + j2 <- jobAdd spc $ Job (writeIORef ref True) 1 + r1 <- jobWait spc j1 + r1 @?= DoneTimeout + + r2 <- jobWait spc j2 + r2 @?= Done + x <- readIORef ref x @?= True ]