🤓 Can run jobs

This commit is contained in:
Nikolaj
2024-10-21 11:35:42 +02:00
parent 849ce2858f
commit 63bdbe688f
2 changed files with 89 additions and 30 deletions

View File

@ -28,9 +28,10 @@ import Control.Concurrent
newChan, newChan,
readChan readChan
) )
import Control.Monad (ap, forever, liftM, void) import Control.Monad (ap, forever, liftM, void, filterM)
import GenServer import GenServer
import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime) import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime)
import GHC.RTS.Flags (DebugFlags(scheduler))
-- First some general utility functions. -- First some general utility functions.
@ -96,7 +97,7 @@ type WorkerName = String
-- processes spawned by the workes. -- processes spawned by the workes.
data WorkerMsg data WorkerMsg
= -- | New job time = -- | New job time
MsgStartJob SPC (IO ()) JobId MsgStartJob (IO ()) JobId
-- Messages sent to SPC. -- Messages sent to SPC.
data SPCMsg data SPCMsg
@ -126,10 +127,11 @@ data Worker = Worker (Server WorkerMsg)
-- | The central state. Must be protected from the bourgeoisie. -- | The central state. Must be protected from the bourgeoisie.
data SPCState = SPCState data SPCState = SPCState
{ spcJobsPending :: [(JobId, Job)], { spcJobsPending :: [(JobId, Job)],
spcJobsRunning :: [(JobId, (Job, Worker))], spcJobsRunning :: [(JobId, (Job, WorkerName))],
spcJobsDone :: [(JobId, JobDoneReason)], spcJobsDone :: [(JobId, JobDoneReason)],
spcJobCounter :: JobId, spcJobCounter :: JobId,
spcWorkers :: [(WorkerName, Worker)] spcWorkers :: [(WorkerName, Worker)],
spcWaiting :: [(JobId, (ReplyChan JobDoneReason))]
} }
-- | The monad in which the main SPC thread runs. This is a state -- | The monad in which the main SPC thread runs. This is a state
@ -174,8 +176,10 @@ io m = SPCM $ \state -> do
runSPCM :: SPCState -> SPCM a -> IO a runSPCM :: SPCState -> SPCM a -> IO a
runSPCM state (SPCM f) = fst <$> f state runSPCM state (SPCM f) = fst <$> f state
workerIsIdle :: WorkerName -> Worker -> SPCM () workerIsIdle :: (WorkerName, Worker) -> SPCM Bool
workerIsIdle = undefined workerIsIdle (name, _) = do
state <- get
pure (all (\(_, (_,w)) -> w /= name) (spcJobsRunning state))
workerIsGone :: WorkerName -> SPCM () workerIsGone :: WorkerName -> SPCM ()
workerIsGone = undefined workerIsGone = undefined
@ -183,9 +187,33 @@ workerIsGone = undefined
checkTimeouts :: SPCM () checkTimeouts :: SPCM ()
checkTimeouts = pure () -- change in Task 4 checkTimeouts = pure () -- change in Task 4
getIdleWorkers :: SPCM [(WorkerName, Worker)]
getIdleWorkers = do
state <- get
filterM (workerIsIdle) (spcWorkers state)
schedule :: SPCM ()
schedule = do
state <- get
case spcJobsPending state of
((jobid, job) : jobs) -> do
workers <- getIdleWorkers
case workers of
(workerName,worker):_ -> do
w <- (\(Worker w) -> pure w) worker
io $ sendTo w (MsgStartJob (jobAction job) jobid)
put $
state
{ spcJobsRunning = (jobid, (job, workerName)) : spcJobsRunning state,
spcJobsPending = jobs
}
_ -> pure ()
_ -> pure ()
handleMsg :: Chan SPCMsg -> SPCM () handleMsg :: Chan SPCMsg -> SPCM ()
handleMsg c = do handleMsg c = do
checkTimeouts checkTimeouts
schedule
msg <- io $ receive c msg <- io $ receive c
case msg of case msg of
MsgJobAdd job rsvp -> do MsgJobAdd job rsvp -> do
@ -223,18 +251,19 @@ handleMsg c = do
MsgJobDone jobid -> do MsgJobDone jobid -> do
state <- get state <- get
case (lookup jobid $ spcJobsRunning state) of case (lookup jobid $ spcJobsRunning state) of
Just (job, worker) -> do Just (job, _) -> do
put $ jobDone jobid Done
state
{ spcJobsRunning =
deleteJob jobid (spcJobsRunning state),
spcJobsDone =
(jobid, Done) : spcJobsDone state
}
Nothing -> pure () Nothing -> pure ()
MsgJobWait jobid rsvp -> do
state <- get
case lookup jobid $ spcJobsDone state of
Just reason -> do
io $ reply rsvp $ reason
Nothing ->
put $ state {spcWaiting = (jobid, rsvp) : spcWaiting state}
_ -> pure () _ -> pure ()
deleteJob :: JobId -> [(JobId, (Job, Worker))] -> [(JobId, (Job, Worker))] deleteJob :: JobId -> [(JobId, (Job, WorkerName))] -> [(JobId, (Job, WorkerName))]
deleteJob jobid list = deleteJob jobid list =
case list of case list of
[] -> [] [] -> []
@ -248,7 +277,8 @@ startSPC = do
spcJobsPending = [], spcJobsPending = [],
spcJobsRunning = [], spcJobsRunning = [],
spcJobsDone = [], spcJobsDone = [],
spcWorkers = [] spcWorkers = [],
spcWaiting = []
} }
c <- spawn $ \c -> runSPCM initial_state $ forever $ handleMsg c c <- spawn $ \c -> runSPCM initial_state $ forever $ handleMsg c
void $ spawn $ timer c void $ spawn $ timer c
@ -258,6 +288,25 @@ startSPC = do
threadDelay 1000000 -- 1 second threadDelay 1000000 -- 1 second
sendTo c MsgTick sendTo c MsgTick
jobDone :: JobId -> JobDoneReason -> SPCM ()
jobDone jobid reason = do
state <- get
case lookup jobid $ spcJobsDone state of
Just _ ->
-- We already know this job is done.
pure ()
Nothing -> do
case (lookup jobid (spcWaiting state)) of
Just rsvp -> io $ reply rsvp $ reason
_ -> pure ()
put $
state
{ spcJobsRunning =
deleteJob jobid (spcJobsRunning state),
spcJobsDone =
(jobid, reason) : spcJobsDone state
}
-- | Add a job for scheduling. -- | Add a job for scheduling.
jobAdd :: SPC -> Job -> IO JobId jobAdd :: SPC -> Job -> IO JobId
jobAdd (SPC c) job = jobAdd (SPC c) job =
@ -286,23 +335,23 @@ workerAdd (SPC c) name = do
if exists if exists
then pure $ Left "Worker with given name already exist" then pure $ Left "Worker with given name already exist"
else do else do
worker <- workerSpawn name worker <- workerSpawn name c
sendTo c $ MsgAddWorker name worker sendTo c $ MsgAddWorker name worker
pure $ Right worker pure $ Right worker
workerSpawn :: WorkerName -> IO Worker workerSpawn :: WorkerName -> (Server SPCMsg) -> IO Worker
workerSpawn name = do workerSpawn name c = do
w <- spawn $ workerLoop name w <- spawn $ workerLoop name c
pure $ Worker w pure $ Worker w
workerLoop :: WorkerName -> Chan WorkerMsg -> IO () workerLoop :: WorkerName -> (Server SPCMsg) -> Chan WorkerMsg -> IO ()
workerLoop name c = do workerLoop name c m = do
msg <- receive c msg <- receive m
case msg of case msg of
-- stuff happening here -- stuff happening here
MsgStartJob (SPC sc) action jobid -> do MsgStartJob action jobid -> do
action action
sendTo sc $ MsgJobDone jobid sendTo c $ MsgJobDone jobid
-- | Shut down a running worker. No effect if the worker is already -- | Shut down a running worker. No effect if the worker is already
-- terminated. -- terminated.

View File

@ -15,24 +15,34 @@ tests =
testGroup testGroup
"SPC (core)" "SPC (core)"
[ [
testCase "worker-add" $ do testCase "workerAdd" $ do
spc <- startSPC spc <- startSPC
w <- workerAdd spc "R2-D2" w <- workerAdd spc "R2-D2"
isRight w @?= True, isRight w @?= True,
testCase "worker-add-2" $ do testCase "workerAdd (2)" $ do
spc <- startSPC spc <- startSPC
_ <- workerAdd spc "MSE-6" _ <- workerAdd spc "MSE-6"
w <- workerAdd spc "GNK" w <- workerAdd spc "GNK"
isRight w @?= True, isRight w @?= True,
testCase "worker-add-3" $ do testCase "workerAdd (3)" $ do
spc <- startSPC spc <- startSPC
_ <- workerAdd spc "C-3PO" _ <- workerAdd spc "C-3PO"
_ <- workerAdd spc "K-2SO" _ <- workerAdd spc "K-2SO"
w <- workerAdd spc "IG-88" w <- workerAdd spc "IG-88"
isRight w @?= True, isRight w @?= True,
testCase "worker-add-2-fail" $ do testCase "workerAdd (fail)" $ do
spc <- startSPC spc <- startSPC
_ <- workerAdd spc "BD-1" _ <- workerAdd spc "BD-1"
w <- workerAdd spc "BD-1" w <- workerAdd spc "BD-1"
isRight w @?= False isRight w @?= False,
testCase "Running a job" $ do
ref <- newIORef False
spc <- startSPC
w <- workerAdd spc "R5-D4"
isRight w @?= True
j <- jobAdd spc $ Job (writeIORef ref True) 1
r <- jobWait spc j
r @?= Done
x <- readIORef ref
x @?= True
] ]