diff --git a/a6/a6-handout/a6.cabal b/a6/a6-handout/a6.cabal new file mode 100644 index 0000000..7bfb1e9 --- /dev/null +++ b/a6/a6-handout/a6.cabal @@ -0,0 +1,30 @@ +cabal-version: 3.0 +name: a6 +version: 1.0.0.0 +build-type: Simple + +common common + default-language: Haskell2010 + ghc-options: -Wall + +library + import: common + hs-source-dirs: src + build-depends: + base + , tasty + , tasty-hunit + , clock + exposed-modules: + SPC + SPC_Tests + GenServer + +test-suite a6-tests + import: common + type: exitcode-stdio-1.0 + main-is: runtests.hs + build-depends: + base + , tasty + , a6 diff --git a/a6/a6-handout/runtests.hs b/a6/a6-handout/runtests.hs new file mode 100644 index 0000000..efba441 --- /dev/null +++ b/a6/a6-handout/runtests.hs @@ -0,0 +1,5 @@ +import qualified SPC_Tests +import Test.Tasty (defaultMain) + +main :: IO () +main = defaultMain SPC_Tests.tests diff --git a/a6/a6-handout/src/GenServer.hs b/a6/a6-handout/src/GenServer.hs new file mode 100644 index 0000000..e70e18c --- /dev/null +++ b/a6/a6-handout/src/GenServer.hs @@ -0,0 +1,45 @@ +module GenServer + ( Chan, + Server, + receive, + send, + sendTo, + spawn, + ReplyChan, + requestReply, + reply, + ) +where + +import Control.Concurrent (Chan) +import qualified Control.Concurrent as CC + +data Server msg = Server CC.ThreadId (Chan msg) + +data ReplyChan a = ReplyChan (Chan a) + +send :: Chan a -> a -> IO () +send chan msg = + CC.writeChan chan msg + +sendTo :: Server a -> a -> IO () +sendTo (Server _tid input) msg = + send input msg + +receive :: Chan a -> IO a +receive = CC.readChan + +spawn :: (Chan a -> IO ()) -> IO (Server a) +spawn server = do + input <- CC.newChan + tid <- CC.forkIO $ server input + pure $ Server tid input + +requestReply :: Server a -> (ReplyChan b -> a) -> IO b +requestReply serv con = do + reply_chan <- CC.newChan + sendTo serv $ con $ ReplyChan reply_chan + receive reply_chan + +reply :: ReplyChan a -> a -> IO () +reply (ReplyChan chan) x = send chan x diff --git a/a6/a6-handout/src/SPC.hs b/a6/a6-handout/src/SPC.hs new file mode 100644 index 0000000..2f90853 --- /dev/null +++ b/a6/a6-handout/src/SPC.hs @@ -0,0 +1,255 @@ +module SPC + ( -- * SPC startup + SPC, + startSPC, + + -- * Job functions + Job (..), + JobId, + JobStatus (..), + JobDoneReason (..), + jobAdd, + jobStatus, + jobWait, + jobCancel, + + -- * Worker functions + WorkerName, + workerAdd, + workerStop, + ) +where + +import Control.Concurrent + ( forkIO, + killThread, + threadDelay, + ) +import Control.Monad (ap, forever, liftM, void) +import GenServer +import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime) + +-- First some general utility functions. + +-- | Retrieve Unix time using a monotonic clock. You cannot use this +-- to measure the actual world time, but you can use it to measure +-- elapsed time. +getSeconds :: IO Seconds +getSeconds = getTime Monotonic + +-- | Remove mapping from association list. +removeAssoc :: (Eq k) => k -> [(k, v)] -> [(k, v)] +removeAssoc needle ((k, v) : kvs) = + if k == needle + then kvs + else (k, v) : removeAssoc needle kvs +removeAssoc _ [] = [] + +-- Then the definition of the glorious SPC. + +-- | A job that is to be enqueued in the glorious SPC. +data Job = Job + { -- | The IO action that comprises the actual action of the job. + jobAction :: IO (), + -- | The maximum allowed runtime of the job, counting from when + -- the job begins executing (not when it is enqueued). + jobMaxSeconds :: Int + } + +-- | A unique identifier of a job that has been enqueued. +newtype JobId = JobId Int + deriving (Eq, Ord, Show) + +-- | How a job finished. +data JobDoneReason + = -- | Normal termination. + Done + | -- | The job was killed because it ran for too long. + DoneTimeout + | -- | The job was explicitly cancelled, or the worker + -- it was running on was stopped. + DoneCancelled + | -- | The job crashed due to an exception. + DoneCrashed + deriving (Eq, Ord, Show) + +-- | The status of a job. +data JobStatus + = -- | The job is done and this is why. + JobDone JobDoneReason + | -- | The job is still running. + JobRunning + | -- | The job is enqueued, but is waiting for an idle worker. + JobPending + | -- | A job with this ID is not known to this SPC instance. + JobUnknown + deriving (Eq, Ord, Show) + +-- | A worker decides its own human-readable name. This is useful for +-- debugging. +type WorkerName = String + +-- | Messages sent to workers. These are sent both by SPC and by +-- processes spawned by the workes. +data WorkerMsg -- TODO: add messages. + +-- Messages sent to SPC. +data SPCMsg + = -- | Add the job, and reply with the job ID. + MsgJobAdd Job (ReplyChan JobId) + | -- | Cancel the given job. + MsgJobCancel JobId + | -- | Immediately reply the status of the job. + MsgJobStatus JobId (ReplyChan JobStatus) + | -- | Reply when the job is done. + MsgJobWait JobId (ReplyChan JobDoneReason) + | -- | Some time has passed. + MsgTick + +-- | A handle to the SPC instance. +data SPC = SPC (Server SPCMsg) + +-- | A handle to a worker. +data Worker = Worker (Server WorkerMsg) + +-- | The central state. Must be protected from the bourgeoisie. +data SPCState = SPCState + { spcJobsPending :: [(JobId, Job)], + spcJobsRunning :: [(JobId, Job)], + spcJobsDone :: [(JobId, JobDoneReason)], + spcJobCounter :: JobId + -- TODO: you will need to add more fields. + } + +-- | The monad in which the main SPC thread runs. This is a state +-- monad with support for IO. +newtype SPCM a = SPCM (SPCState -> IO (a, SPCState)) + +instance Functor SPCM where + fmap = liftM + +instance Applicative SPCM where + pure x = SPCM $ \state -> pure (x, state) + (<*>) = ap + +instance Monad SPCM where + SPCM m >>= f = SPCM $ \state -> do + (x, state') <- m state + let SPCM f' = f x + f' state' + +-- | Retrieve the state. +get :: SPCM SPCState +get = SPCM $ \state -> pure (state, state) + +-- | Overwrite the state. +put :: SPCState -> SPCM () +put state = SPCM $ \_ -> pure ((), state) + +-- | Modify the state. +modify :: (SPCState -> SPCState) -> SPCM () +modify f = do + state <- get + put $ f state + +-- | Lift an 'IO' action into 'SPCM'. +io :: IO a -> SPCM a +io m = SPCM $ \state -> do + x <- m + pure (x, state) + +-- | Run the SPCM monad. +runSPCM :: SPCState -> SPCM a -> IO a +runSPCM state (SPCM f) = fst <$> f state + +schedule :: SPCM () +schedule = undefined + +jobDone :: JobId -> JobDoneReason -> SPCM () +jobDone = undefined + +workerIsIdle :: WorkerName -> Worker -> SPCM () +workerIsIdle = undefined + +workerIsGone :: WorkerName -> SPCM () +workerIsGone = undefined + +checkTimeouts :: SPCM () +checkTimeouts = pure () -- change in Task 4 + +workerExists :: WorkerName -> SPCM Bool +workerExists = undefined + +handleMsg :: Chan SPCMsg -> SPCM () +handleMsg c = do + checkTimeouts + schedule + msg <- io $ receive c + case msg of + MsgJobAdd job rsvp -> do + state <- get + let JobId jobid = spcJobCounter state + put $ + state + { spcJobsPending = + (spcJobCounter state, job) : spcJobsPending state, + spcJobCounter = JobId $ succ jobid + } + io $ reply rsvp $ JobId jobid + MsgJobStatus jobid rsvp -> do + state <- get + io $ reply rsvp $ case ( lookup jobid $ spcJobsPending state, + lookup jobid $ spcJobsRunning state, + lookup jobid $ spcJobsDone state + ) of + (Just _, _, _) -> JobPending + (_, Just _, _) -> JobRunning + (_, _, Just r) -> JobDone r + _ -> JobUnknown + +startSPC :: IO SPC +startSPC = do + let initial_state = + SPCState + { spcJobCounter = JobId 0, + spcJobsPending = [], + spcJobsRunning = [], + spcJobsDone = [] + } + c <- spawn $ \c -> runSPCM initial_state $ forever $ handleMsg c + void $ spawn $ timer c + pure $ SPC c + where + timer c _ = forever $ do + threadDelay 1000000 -- 1 second + sendTo c MsgTick + +-- | Add a job for scheduling. +jobAdd :: SPC -> Job -> IO JobId +jobAdd (SPC c) job = + requestReply c $ MsgJobAdd job + +-- | Asynchronously query the job status. +jobStatus :: SPC -> JobId -> IO JobStatus +jobStatus (SPC c) jobid = + requestReply c $ MsgJobStatus jobid + +-- | Synchronously block until job is done and return the reason. +jobWait :: SPC -> JobId -> IO JobDoneReason +jobWait (SPC c) jobid = + requestReply c $ MsgJobWait jobid + +-- | Asynchronously cancel a job. +jobCancel :: SPC -> JobId -> IO () +jobCancel (SPC c) jobid = + sendTo c $ MsgJobCancel jobid + +-- | Add a new worker with this name. Fails with 'Left' if a worker +-- with that name already exists. +workerAdd :: SPC -> WorkerName -> IO (Either String Worker) +workerAdd = undefined + +-- | Shut down a running worker. No effect if the worker is already +-- terminated. +workerStop :: Worker -> IO () +workerStop = undefined diff --git a/a6/a6-handout/src/SPC_Tests.hs b/a6/a6-handout/src/SPC_Tests.hs new file mode 100644 index 0000000..113d738 --- /dev/null +++ b/a6/a6-handout/src/SPC_Tests.hs @@ -0,0 +1,15 @@ +module SPC_Tests (tests) where + +import Control.Concurrent (threadDelay) +import Control.Monad (forM, forM_, replicateM) +import Data.IORef +import SPC +import Test.Tasty (TestTree, localOption, mkTimeout, testGroup) +import Test.Tasty.HUnit (assertFailure, testCase, (@?=)) + +tests :: TestTree +tests = + localOption (mkTimeout 3000000) $ + testGroup + "SPC (core)" + [] diff --git a/a6/a6.pdf b/a6/a6.pdf new file mode 100644 index 0000000..b85d210 Binary files /dev/null and b/a6/a6.pdf differ