🤖 task 3 and 4

This commit is contained in:
2024-10-21 16:33:05 +02:00
parent b1335209b6
commit 638786f8c2
2 changed files with 143 additions and 35 deletions

View File

@ -18,6 +18,9 @@ module SPC
WorkerName,
workerAdd,
workerStop,
-- debugState,
-- SPCState (..)
)
where
@ -26,9 +29,10 @@ import Control.Concurrent
killThread,
threadDelay,
newChan,
readChan
readChan,
ThreadId
)
import Control.Monad (ap, forever, liftM, void, filterM)
import Control.Monad (ap, forever, liftM, void, filterM, when)
import GenServer
import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime)
import GHC.RTS.Flags (DebugFlags(scheduler))
@ -97,7 +101,7 @@ type WorkerName = String
-- processes spawned by the workes.
data WorkerMsg
= -- | New job time
MsgStartJob (IO ()) JobId
MsgStartJob (IO ()) JobId (ReplyChan ThreadId)
-- Messages sent to SPC.
data SPCMsg
@ -117,6 +121,7 @@ data SPCMsg
MsgAddWorker WorkerName Worker
| -- | Worker finished job
MsgJobDone JobId
-- | MsgDebug (ReplyChan SPCState)
-- | A handle to the SPC instance.
data SPC = SPC (Server SPCMsg)
@ -127,7 +132,7 @@ data Worker = Worker (Server WorkerMsg)
-- | The central state. Must be protected from the bourgeoisie.
data SPCState = SPCState
{ spcJobsPending :: [(JobId, Job)],
spcJobsRunning :: [(JobId, (Job, WorkerName))],
spcJobsRunning :: [(JobId, (WorkerName, Seconds, ThreadId))],
spcJobsDone :: [(JobId, JobDoneReason)],
spcJobCounter :: JobId,
spcWorkers :: [(WorkerName, Worker)],
@ -179,13 +184,19 @@ runSPCM state (SPCM f) = fst <$> f state
workerIsIdle :: (WorkerName, Worker) -> SPCM Bool
workerIsIdle (name, _) = do
state <- get
pure (all (\(_, (_,w)) -> w /= name) (spcJobsRunning state))
pure (all (\(_, (w,_,_)) -> w /= name) (spcJobsRunning state))
workerIsGone :: WorkerName -> SPCM ()
workerIsGone = undefined
checkJobTimeout :: (JobId, (WorkerName, Seconds, ThreadId)) -> SPCM ()
checkJobTimeout (jobid, (_, deadline, t)) = do
now <- io $ getSeconds
when (now >= deadline) $ do
io $ killThread t
jobDone jobid DoneTimeout
checkTimeouts :: SPCM ()
checkTimeouts = pure () -- change in Task 4
checkTimeouts = do
state <- get
mapM_ checkJobTimeout (spcJobsRunning state)
getIdleWorkers :: SPCM [(WorkerName, Worker)]
getIdleWorkers = do
@ -201,10 +212,12 @@ schedule = do
case workers of
(workerName,worker):_ -> do
w <- (\(Worker w) -> pure w) worker
io $ sendTo w (MsgStartJob (jobAction job) jobid)
threadId <- io $ requestReply w (MsgStartJob (jobAction job) jobid)
now <- io $ getSeconds
let deadline = now + fromIntegral (jobMaxSeconds job)
put $
state
{ spcJobsRunning = (jobid, (job, workerName)) : spcJobsRunning state,
{ spcJobsRunning = (jobid, (workerName, deadline, threadId)) : spcJobsRunning state,
spcJobsPending = jobs
}
_ -> pure ()
@ -251,7 +264,7 @@ handleMsg c = do
MsgJobDone jobid -> do
state <- get
case (lookup jobid $ spcJobsRunning state) of
Just (job, _) -> do
Just (_, _, _) -> do
jobDone jobid Done
Nothing -> pure ()
MsgJobWait jobid rsvp -> do
@ -261,14 +274,21 @@ handleMsg c = do
io $ reply rsvp $ reason
Nothing ->
put $ state {spcWaiting = (jobid, rsvp) : spcWaiting state}
MsgJobCancel jobid -> do
state <- get
case (lookup jobid $ spcJobsRunning state, lookup jobid $ spcJobsPending state) of
(Just (_,_,t), _) -> do
io $ killThread t
jobDone jobid DoneCancelled
(_, Just _) -> do
put $
state
{ spcJobsPending = removeAssoc jobid $ spcJobsPending state,
spcJobsDone = (jobid, DoneCancelled) : spcJobsDone state
}
_ -> pure ()
_ -> pure ()
deleteJob :: JobId -> [(JobId, (Job, WorkerName))] -> [(JobId, (Job, WorkerName))]
deleteJob jobid list =
case list of
[] -> []
(jid, (job, w)):l -> if (jid == jobid) then l else (jid,(job,w)):(deleteJob jobid l)
startSPC :: IO SPC
startSPC = do
let initial_state =
@ -302,7 +322,7 @@ jobDone jobid reason = do
put $
state
{ spcJobsRunning =
deleteJob jobid (spcJobsRunning state),
removeAssoc jobid $ spcJobsRunning state,
spcJobsDone =
(jobid, reason) : spcJobsDone state
}
@ -327,6 +347,10 @@ jobCancel :: SPC -> JobId -> IO ()
jobCancel (SPC c) jobid =
sendTo c $ MsgJobCancel jobid
-- debugState :: SPC -> IO SPCState
-- debugState (SPC c) =
-- requestReply c $ MsgDebug
-- | Add a new worker with this name. Fails with 'Left' if a worker
-- with that name already exists.
workerAdd :: SPC -> WorkerName -> IO (Either String Worker)
@ -335,23 +359,25 @@ workerAdd (SPC c) name = do
if exists
then pure $ Left "Worker with given name already exist"
else do
worker <- workerSpawn name c
worker <- workerSpawn c
sendTo c $ MsgAddWorker name worker
pure $ Right worker
workerSpawn :: WorkerName -> (Server SPCMsg) -> IO Worker
workerSpawn name c = do
w <- spawn $ workerLoop name c
workerSpawn :: (Server SPCMsg) -> IO Worker
workerSpawn c = do
w <- spawn $ workerLoop c
pure $ Worker w
workerLoop :: WorkerName -> (Server SPCMsg) -> Chan WorkerMsg -> IO ()
workerLoop name c m = do
workerLoop :: (Server SPCMsg) -> Chan WorkerMsg -> IO ()
workerLoop c m = forever $ do
msg <- receive m
case msg of
-- stuff happening here
MsgStartJob action jobid -> do
action
sendTo c $ MsgJobDone jobid
MsgStartJob action jobid rsvp -> do
t <- forkIO $ do
action
sendTo c $ MsgJobDone jobid
reply rsvp t
-- | Shut down a running worker. No effect if the worker is already
-- terminated.

View File

@ -17,36 +17,118 @@ tests =
[
testCase "workerAdd" $ do
spc <- startSPC
w <- workerAdd spc "R2-D2"
isRight w @?= True,
testCase "workerAdd (2)" $ do
spc <- startSPC
w1 <- workerAdd spc "MSE-6"
isRight w1 @?= True
w2 <- workerAdd spc "GNK"
isRight w2 @?= True,
testCase "workerAdd (3)" $ do
spc <- startSPC
w1 <- workerAdd spc "C-3PO"
isRight w1 @?= True
w2 <- workerAdd spc "K-2SO"
isRight w2 @?= True
w3 <- workerAdd spc "IG-88"
isRight w3 @?= True,
testCase "workerAdd (fail)" $ do
spc <- startSPC
w1 <- workerAdd spc "BD-1"
isRight w1 @?= True
w2 <- workerAdd spc "BD-1"
isRight w2 @?= False,
testCase "Running a job" $ do
ref <- newIORef False
spc <- startSPC
w <- workerAdd spc "R5-D4"
isRight w @?= True
j <- jobAdd spc $ Job (writeIORef ref True) 1
r <- jobWait spc j
r @?= Done
x <- readIORef ref
x @?= True,
testCase "Running two jobs" $ do
ref <- newIORef 0
spc <- startSPC
w <- workerAdd spc "K-2SO"
isRight w @?= True
j1 <- jobAdd spc $ Job (writeIORef ref 1) 1
r1 <- jobWait spc j1
r1 @?= Done
x1 <- readIORef ref
x1 @?= 1
j2 <- jobAdd spc $ Job (writeIORef ref 2) 1
r2 <- jobWait spc j2
r2 @?= Done
x2 <- readIORef ref
x2 @?= 2,
testCase "Canceling job (pending)" $ do
spc <- startSPC
j <- jobAdd spc $ Job (pure ()) 1
jobCancel spc j
r <- jobStatus spc j
r @?= JobDone DoneCancelled,
testCase "Canceling job (running)" $ do
spc <- startSPC
w <- workerAdd spc "IG-88"
isRight w @?= True
j <- jobAdd spc $ Job (threadDelay 2000000) 2
jobCancel spc j
r <- jobStatus spc j
r @?= JobDone DoneCancelled,
testCase "Canceling job (running) (new job)" $ do
ref <- newIORef False
spc <- startSPC
w <- workerAdd spc "C-3PO"
isRight w @?= True
j1 <- jobAdd spc $ Job (threadDelay 2000000) 2
jobCancel spc j1
r1 <- jobStatus spc j1
r1 @?= JobDone DoneCancelled
-- job has been cancelled. Starting new job
j2 <- jobAdd spc $ Job (writeIORef ref True) 1
r2 <- jobWait spc j2
r2 @?= Done
x <- readIORef ref
x @?= True,
testCase "Timeout" $ do
spc <- startSPC
w <- workerAdd spc "L3-37"
isRight w @?= True
j <- jobAdd spc $ Job (threadDelay 2000000) 1
r <- jobWait spc j
r @?= DoneTimeout,
testCase "Timeout (2 jobs)" $ do
ref <- newIORef False
spc <- startSPC
w <- workerAdd spc "General Kalani"
isRight w @?= True
j1 <- jobAdd spc $ Job (threadDelay 2000000) 1
j2 <- jobAdd spc $ Job (writeIORef ref True) 1
r1 <- jobWait spc j1
r1 @?= DoneTimeout
r2 <- jobWait spc j2
r2 @?= Done
x <- readIORef ref
x @?= True
]