From 63bdbe688f4dd7141da9fa66caaaa6934e2ff3a6 Mon Sep 17 00:00:00 2001 From: Nikolaj Date: Mon, 21 Oct 2024 11:35:42 +0200 Subject: [PATCH] :nerd_face: Can run jobs --- a6/src/SPC.hs | 99 +++++++++++++++++++++++++++++++++------------ a6/src/SPC_Tests.hs | 20 ++++++--- 2 files changed, 89 insertions(+), 30 deletions(-) diff --git a/a6/src/SPC.hs b/a6/src/SPC.hs index 1d4faf3..ed1dfd2 100644 --- a/a6/src/SPC.hs +++ b/a6/src/SPC.hs @@ -28,9 +28,10 @@ import Control.Concurrent newChan, readChan ) -import Control.Monad (ap, forever, liftM, void) +import Control.Monad (ap, forever, liftM, void, filterM) import GenServer import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime) +import GHC.RTS.Flags (DebugFlags(scheduler)) -- First some general utility functions. @@ -96,7 +97,7 @@ type WorkerName = String -- processes spawned by the workes. data WorkerMsg = -- | New job time - MsgStartJob SPC (IO ()) JobId + MsgStartJob (IO ()) JobId -- Messages sent to SPC. data SPCMsg @@ -126,10 +127,11 @@ data Worker = Worker (Server WorkerMsg) -- | The central state. Must be protected from the bourgeoisie. data SPCState = SPCState { spcJobsPending :: [(JobId, Job)], - spcJobsRunning :: [(JobId, (Job, Worker))], + spcJobsRunning :: [(JobId, (Job, WorkerName))], spcJobsDone :: [(JobId, JobDoneReason)], spcJobCounter :: JobId, - spcWorkers :: [(WorkerName, Worker)] + spcWorkers :: [(WorkerName, Worker)], + spcWaiting :: [(JobId, (ReplyChan JobDoneReason))] } -- | The monad in which the main SPC thread runs. This is a state @@ -174,8 +176,10 @@ io m = SPCM $ \state -> do runSPCM :: SPCState -> SPCM a -> IO a runSPCM state (SPCM f) = fst <$> f state -workerIsIdle :: WorkerName -> Worker -> SPCM () -workerIsIdle = undefined +workerIsIdle :: (WorkerName, Worker) -> SPCM Bool +workerIsIdle (name, _) = do + state <- get + pure (all (\(_, (_,w)) -> w /= name) (spcJobsRunning state)) workerIsGone :: WorkerName -> SPCM () workerIsGone = undefined @@ -183,9 +187,33 @@ workerIsGone = undefined checkTimeouts :: SPCM () checkTimeouts = pure () -- change in Task 4 +getIdleWorkers :: SPCM [(WorkerName, Worker)] +getIdleWorkers = do + state <- get + filterM (workerIsIdle) (spcWorkers state) + +schedule :: SPCM () +schedule = do + state <- get + case spcJobsPending state of + ((jobid, job) : jobs) -> do + workers <- getIdleWorkers + case workers of + (workerName,worker):_ -> do + w <- (\(Worker w) -> pure w) worker + io $ sendTo w (MsgStartJob (jobAction job) jobid) + put $ + state + { spcJobsRunning = (jobid, (job, workerName)) : spcJobsRunning state, + spcJobsPending = jobs + } + _ -> pure () + _ -> pure () + handleMsg :: Chan SPCMsg -> SPCM () handleMsg c = do checkTimeouts + schedule msg <- io $ receive c case msg of MsgJobAdd job rsvp -> do @@ -223,18 +251,19 @@ handleMsg c = do MsgJobDone jobid -> do state <- get case (lookup jobid $ spcJobsRunning state) of - Just (job, worker) -> do - put $ - state - { spcJobsRunning = - deleteJob jobid (spcJobsRunning state), - spcJobsDone = - (jobid, Done) : spcJobsDone state - } + Just (job, _) -> do + jobDone jobid Done Nothing -> pure () + MsgJobWait jobid rsvp -> do + state <- get + case lookup jobid $ spcJobsDone state of + Just reason -> do + io $ reply rsvp $ reason + Nothing -> + put $ state {spcWaiting = (jobid, rsvp) : spcWaiting state} _ -> pure () -deleteJob :: JobId -> [(JobId, (Job, Worker))] -> [(JobId, (Job, Worker))] +deleteJob :: JobId -> [(JobId, (Job, WorkerName))] -> [(JobId, (Job, WorkerName))] deleteJob jobid list = case list of [] -> [] @@ -248,7 +277,8 @@ startSPC = do spcJobsPending = [], spcJobsRunning = [], spcJobsDone = [], - spcWorkers = [] + spcWorkers = [], + spcWaiting = [] } c <- spawn $ \c -> runSPCM initial_state $ forever $ handleMsg c void $ spawn $ timer c @@ -258,6 +288,25 @@ startSPC = do threadDelay 1000000 -- 1 second sendTo c MsgTick +jobDone :: JobId -> JobDoneReason -> SPCM () +jobDone jobid reason = do + state <- get + case lookup jobid $ spcJobsDone state of + Just _ -> + -- We already know this job is done. + pure () + Nothing -> do + case (lookup jobid (spcWaiting state)) of + Just rsvp -> io $ reply rsvp $ reason + _ -> pure () + put $ + state + { spcJobsRunning = + deleteJob jobid (spcJobsRunning state), + spcJobsDone = + (jobid, reason) : spcJobsDone state + } + -- | Add a job for scheduling. jobAdd :: SPC -> Job -> IO JobId jobAdd (SPC c) job = @@ -286,23 +335,23 @@ workerAdd (SPC c) name = do if exists then pure $ Left "Worker with given name already exist" else do - worker <- workerSpawn name + worker <- workerSpawn name c sendTo c $ MsgAddWorker name worker pure $ Right worker -workerSpawn :: WorkerName -> IO Worker -workerSpawn name = do - w <- spawn $ workerLoop name +workerSpawn :: WorkerName -> (Server SPCMsg) -> IO Worker +workerSpawn name c = do + w <- spawn $ workerLoop name c pure $ Worker w -workerLoop :: WorkerName -> Chan WorkerMsg -> IO () -workerLoop name c = do - msg <- receive c +workerLoop :: WorkerName -> (Server SPCMsg) -> Chan WorkerMsg -> IO () +workerLoop name c m = do + msg <- receive m case msg of -- stuff happening here - MsgStartJob (SPC sc) action jobid -> do + MsgStartJob action jobid -> do action - sendTo sc $ MsgJobDone jobid + sendTo c $ MsgJobDone jobid -- | Shut down a running worker. No effect if the worker is already -- terminated. diff --git a/a6/src/SPC_Tests.hs b/a6/src/SPC_Tests.hs index a55ed40..6aa3b96 100644 --- a/a6/src/SPC_Tests.hs +++ b/a6/src/SPC_Tests.hs @@ -15,24 +15,34 @@ tests = testGroup "SPC (core)" [ - testCase "worker-add" $ do + testCase "workerAdd" $ do spc <- startSPC w <- workerAdd spc "R2-D2" isRight w @?= True, - testCase "worker-add-2" $ do + testCase "workerAdd (2)" $ do spc <- startSPC _ <- workerAdd spc "MSE-6" w <- workerAdd spc "GNK" isRight w @?= True, - testCase "worker-add-3" $ do + testCase "workerAdd (3)" $ do spc <- startSPC _ <- workerAdd spc "C-3PO" _ <- workerAdd spc "K-2SO" w <- workerAdd spc "IG-88" isRight w @?= True, - testCase "worker-add-2-fail" $ do + testCase "workerAdd (fail)" $ do spc <- startSPC _ <- workerAdd spc "BD-1" w <- workerAdd spc "BD-1" - isRight w @?= False + isRight w @?= False, + testCase "Running a job" $ do + ref <- newIORef False + spc <- startSPC + w <- workerAdd spc "R5-D4" + isRight w @?= True + j <- jobAdd spc $ Job (writeIORef ref True) 1 + r <- jobWait spc j + r @?= Done + x <- readIORef ref + x @?= True ]