This commit is contained in:
2024-10-16 13:07:54 +02:00
parent b0b087648c
commit 775013b825
6 changed files with 350 additions and 0 deletions

30
a6/a6-handout/a6.cabal Normal file
View File

@ -0,0 +1,30 @@
cabal-version: 3.0
name: a6
version: 1.0.0.0
build-type: Simple
common common
default-language: Haskell2010
ghc-options: -Wall
library
import: common
hs-source-dirs: src
build-depends:
base
, tasty
, tasty-hunit
, clock
exposed-modules:
SPC
SPC_Tests
GenServer
test-suite a6-tests
import: common
type: exitcode-stdio-1.0
main-is: runtests.hs
build-depends:
base
, tasty
, a6

View File

@ -0,0 +1,5 @@
import qualified SPC_Tests
import Test.Tasty (defaultMain)
main :: IO ()
main = defaultMain SPC_Tests.tests

View File

@ -0,0 +1,45 @@
module GenServer
( Chan,
Server,
receive,
send,
sendTo,
spawn,
ReplyChan,
requestReply,
reply,
)
where
import Control.Concurrent (Chan)
import qualified Control.Concurrent as CC
data Server msg = Server CC.ThreadId (Chan msg)
data ReplyChan a = ReplyChan (Chan a)
send :: Chan a -> a -> IO ()
send chan msg =
CC.writeChan chan msg
sendTo :: Server a -> a -> IO ()
sendTo (Server _tid input) msg =
send input msg
receive :: Chan a -> IO a
receive = CC.readChan
spawn :: (Chan a -> IO ()) -> IO (Server a)
spawn server = do
input <- CC.newChan
tid <- CC.forkIO $ server input
pure $ Server tid input
requestReply :: Server a -> (ReplyChan b -> a) -> IO b
requestReply serv con = do
reply_chan <- CC.newChan
sendTo serv $ con $ ReplyChan reply_chan
receive reply_chan
reply :: ReplyChan a -> a -> IO ()
reply (ReplyChan chan) x = send chan x

255
a6/a6-handout/src/SPC.hs Normal file
View File

@ -0,0 +1,255 @@
module SPC
( -- * SPC startup
SPC,
startSPC,
-- * Job functions
Job (..),
JobId,
JobStatus (..),
JobDoneReason (..),
jobAdd,
jobStatus,
jobWait,
jobCancel,
-- * Worker functions
WorkerName,
workerAdd,
workerStop,
)
where
import Control.Concurrent
( forkIO,
killThread,
threadDelay,
)
import Control.Monad (ap, forever, liftM, void)
import GenServer
import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime)
-- First some general utility functions.
-- | Retrieve Unix time using a monotonic clock. You cannot use this
-- to measure the actual world time, but you can use it to measure
-- elapsed time.
getSeconds :: IO Seconds
getSeconds = getTime Monotonic
-- | Remove mapping from association list.
removeAssoc :: (Eq k) => k -> [(k, v)] -> [(k, v)]
removeAssoc needle ((k, v) : kvs) =
if k == needle
then kvs
else (k, v) : removeAssoc needle kvs
removeAssoc _ [] = []
-- Then the definition of the glorious SPC.
-- | A job that is to be enqueued in the glorious SPC.
data Job = Job
{ -- | The IO action that comprises the actual action of the job.
jobAction :: IO (),
-- | The maximum allowed runtime of the job, counting from when
-- the job begins executing (not when it is enqueued).
jobMaxSeconds :: Int
}
-- | A unique identifier of a job that has been enqueued.
newtype JobId = JobId Int
deriving (Eq, Ord, Show)
-- | How a job finished.
data JobDoneReason
= -- | Normal termination.
Done
| -- | The job was killed because it ran for too long.
DoneTimeout
| -- | The job was explicitly cancelled, or the worker
-- it was running on was stopped.
DoneCancelled
| -- | The job crashed due to an exception.
DoneCrashed
deriving (Eq, Ord, Show)
-- | The status of a job.
data JobStatus
= -- | The job is done and this is why.
JobDone JobDoneReason
| -- | The job is still running.
JobRunning
| -- | The job is enqueued, but is waiting for an idle worker.
JobPending
| -- | A job with this ID is not known to this SPC instance.
JobUnknown
deriving (Eq, Ord, Show)
-- | A worker decides its own human-readable name. This is useful for
-- debugging.
type WorkerName = String
-- | Messages sent to workers. These are sent both by SPC and by
-- processes spawned by the workes.
data WorkerMsg -- TODO: add messages.
-- Messages sent to SPC.
data SPCMsg
= -- | Add the job, and reply with the job ID.
MsgJobAdd Job (ReplyChan JobId)
| -- | Cancel the given job.
MsgJobCancel JobId
| -- | Immediately reply the status of the job.
MsgJobStatus JobId (ReplyChan JobStatus)
| -- | Reply when the job is done.
MsgJobWait JobId (ReplyChan JobDoneReason)
| -- | Some time has passed.
MsgTick
-- | A handle to the SPC instance.
data SPC = SPC (Server SPCMsg)
-- | A handle to a worker.
data Worker = Worker (Server WorkerMsg)
-- | The central state. Must be protected from the bourgeoisie.
data SPCState = SPCState
{ spcJobsPending :: [(JobId, Job)],
spcJobsRunning :: [(JobId, Job)],
spcJobsDone :: [(JobId, JobDoneReason)],
spcJobCounter :: JobId
-- TODO: you will need to add more fields.
}
-- | The monad in which the main SPC thread runs. This is a state
-- monad with support for IO.
newtype SPCM a = SPCM (SPCState -> IO (a, SPCState))
instance Functor SPCM where
fmap = liftM
instance Applicative SPCM where
pure x = SPCM $ \state -> pure (x, state)
(<*>) = ap
instance Monad SPCM where
SPCM m >>= f = SPCM $ \state -> do
(x, state') <- m state
let SPCM f' = f x
f' state'
-- | Retrieve the state.
get :: SPCM SPCState
get = SPCM $ \state -> pure (state, state)
-- | Overwrite the state.
put :: SPCState -> SPCM ()
put state = SPCM $ \_ -> pure ((), state)
-- | Modify the state.
modify :: (SPCState -> SPCState) -> SPCM ()
modify f = do
state <- get
put $ f state
-- | Lift an 'IO' action into 'SPCM'.
io :: IO a -> SPCM a
io m = SPCM $ \state -> do
x <- m
pure (x, state)
-- | Run the SPCM monad.
runSPCM :: SPCState -> SPCM a -> IO a
runSPCM state (SPCM f) = fst <$> f state
schedule :: SPCM ()
schedule = undefined
jobDone :: JobId -> JobDoneReason -> SPCM ()
jobDone = undefined
workerIsIdle :: WorkerName -> Worker -> SPCM ()
workerIsIdle = undefined
workerIsGone :: WorkerName -> SPCM ()
workerIsGone = undefined
checkTimeouts :: SPCM ()
checkTimeouts = pure () -- change in Task 4
workerExists :: WorkerName -> SPCM Bool
workerExists = undefined
handleMsg :: Chan SPCMsg -> SPCM ()
handleMsg c = do
checkTimeouts
schedule
msg <- io $ receive c
case msg of
MsgJobAdd job rsvp -> do
state <- get
let JobId jobid = spcJobCounter state
put $
state
{ spcJobsPending =
(spcJobCounter state, job) : spcJobsPending state,
spcJobCounter = JobId $ succ jobid
}
io $ reply rsvp $ JobId jobid
MsgJobStatus jobid rsvp -> do
state <- get
io $ reply rsvp $ case ( lookup jobid $ spcJobsPending state,
lookup jobid $ spcJobsRunning state,
lookup jobid $ spcJobsDone state
) of
(Just _, _, _) -> JobPending
(_, Just _, _) -> JobRunning
(_, _, Just r) -> JobDone r
_ -> JobUnknown
startSPC :: IO SPC
startSPC = do
let initial_state =
SPCState
{ spcJobCounter = JobId 0,
spcJobsPending = [],
spcJobsRunning = [],
spcJobsDone = []
}
c <- spawn $ \c -> runSPCM initial_state $ forever $ handleMsg c
void $ spawn $ timer c
pure $ SPC c
where
timer c _ = forever $ do
threadDelay 1000000 -- 1 second
sendTo c MsgTick
-- | Add a job for scheduling.
jobAdd :: SPC -> Job -> IO JobId
jobAdd (SPC c) job =
requestReply c $ MsgJobAdd job
-- | Asynchronously query the job status.
jobStatus :: SPC -> JobId -> IO JobStatus
jobStatus (SPC c) jobid =
requestReply c $ MsgJobStatus jobid
-- | Synchronously block until job is done and return the reason.
jobWait :: SPC -> JobId -> IO JobDoneReason
jobWait (SPC c) jobid =
requestReply c $ MsgJobWait jobid
-- | Asynchronously cancel a job.
jobCancel :: SPC -> JobId -> IO ()
jobCancel (SPC c) jobid =
sendTo c $ MsgJobCancel jobid
-- | Add a new worker with this name. Fails with 'Left' if a worker
-- with that name already exists.
workerAdd :: SPC -> WorkerName -> IO (Either String Worker)
workerAdd = undefined
-- | Shut down a running worker. No effect if the worker is already
-- terminated.
workerStop :: Worker -> IO ()
workerStop = undefined

View File

@ -0,0 +1,15 @@
module SPC_Tests (tests) where
import Control.Concurrent (threadDelay)
import Control.Monad (forM, forM_, replicateM)
import Data.IORef
import SPC
import Test.Tasty (TestTree, localOption, mkTimeout, testGroup)
import Test.Tasty.HUnit (assertFailure, testCase, (@?=))
tests :: TestTree
tests =
localOption (mkTimeout 3000000) $
testGroup
"SPC (core)"
[]

BIN
a6/a6.pdf Normal file

Binary file not shown.