📬 help

This commit is contained in:
2024-10-16 16:15:38 +02:00
parent 775013b825
commit 46154359eb
5 changed files with 65 additions and 5 deletions

View File

@ -24,6 +24,8 @@ import Control.Concurrent
( forkIO,
killThread,
threadDelay,
newChan,
readChan
)
import Control.Monad (ap, forever, liftM, void)
import GenServer
@ -91,7 +93,9 @@ type WorkerName = String
-- | Messages sent to workers. These are sent both by SPC and by
-- processes spawned by the workes.
data WorkerMsg -- TODO: add messages.
data WorkerMsg
= -- | New job time
MsgStartJob (IO ()) JobId
-- Messages sent to SPC.
data SPCMsg
@ -105,6 +109,12 @@ data SPCMsg
MsgJobWait JobId (ReplyChan JobDoneReason)
| -- | Some time has passed.
MsgTick
| -- | Ask if worker exists
MsgWorkerExists WorkerName (ReplyChan Bool)
| -- | Add a new worker
MsgAddWorker WorkerName Worker
| -- | Worker finished job
MsgJobDone JobId
-- | A handle to the SPC instance.
data SPC = SPC (Server SPCMsg)
@ -115,9 +125,10 @@ data Worker = Worker (Server WorkerMsg)
-- | The central state. Must be protected from the bourgeoisie.
data SPCState = SPCState
{ spcJobsPending :: [(JobId, Job)],
spcJobsRunning :: [(JobId, Job)],
spcJobsRunning :: [(JobId, (Job, Worker))],
spcJobsDone :: [(JobId, JobDoneReason)],
spcJobCounter :: JobId
spcJobCounter :: JobId,
spcWorkers :: [(WorkerName, Worker)]
-- TODO: you will need to add more fields.
}
@ -206,6 +217,35 @@ handleMsg c = do
(_, Just _, _) -> JobRunning
(_, _, Just r) -> JobDone r
_ -> JobUnknown
MsgWorkerExists name rsvp -> do
state <- get
io $ reply rsvp $ case (lookup name $ spcWorkers state) of
Just _ -> True
_ -> False
MsgAddWorker name worker -> do
state <- get
put $
state
{ spcWorkers =
(name, worker) : spcWorkers state
}
MsgJobDone jobid -> do
state <- get
case (lookup jobid $ spcJobsRunning state) of
Just (job, worker) -> do
put $
state
{ spcJobsRunning =
deleteJob jobid (spcJobsRunning state),
spcJobsDone =
(jobid, Done) : spcJobsDone state
}
deleteJob :: JobId -> [(JobId, (Job, Worker))] -> [(JobId, (Job, Worker))]
deleteJob jobid list =
case list of
[] -> []
(jid, (job, w)):l -> if (jid == jobid) then l else (jid,(job,w)):(deleteJob jobid l)
startSPC :: IO SPC
startSPC = do
@ -214,7 +254,8 @@ startSPC = do
{ spcJobCounter = JobId 0,
spcJobsPending = [],
spcJobsRunning = [],
spcJobsDone = []
spcJobsDone = [],
spcWorkers = []
}
c <- spawn $ \c -> runSPCM initial_state $ forever $ handleMsg c
void $ spawn $ timer c
@ -247,8 +288,27 @@ jobCancel (SPC c) jobid =
-- | Add a new worker with this name. Fails with 'Left' if a worker
-- with that name already exists.
workerAdd :: SPC -> WorkerName -> IO (Either String Worker)
workerAdd = undefined
workerAdd (SPC c) name = do
exists <- requestReply c $ MsgWorkerExists name
if exists
then pure $ Left "Worker with given name already exist"
else do
worker <- workerSpawn name
sendTo c $ MsgAddWorker name worker
pure $ Right worker
workerSpawn :: WorkerName -> IO Worker
workerSpawn name = do
w <- spawn $ workerLoop name
pure $ Worker w
workerLoop :: WorkerName -> Chan WorkerMsg -> IO ()
workerLoop name c = do
msg <- receive c
case msg of
-- stuff happening here
MsgStartJob action jobid -> -- do stuff
-- | Shut down a running worker. No effect if the worker is already
-- terminated.
workerStop :: Worker -> IO ()