Compare commits

..

15 Commits

Author SHA1 Message Date
78ee47c2e6 🤐 Zip 2024-10-21 18:47:06 +02:00
52610b1ed8 🤐 Zip 2024-10-21 18:45:16 +02:00
d57412b454 📝 new test 2024-10-21 18:35:54 +02:00
ea542df037 🔪 Killing workers 2024-10-21 17:41:01 +02:00
f38281b346 💥 Crashing 2024-10-21 16:49:16 +02:00
638786f8c2 🤖 task 3 and 4 2024-10-21 16:33:05 +02:00
b1335209b6 ✏️ 2024-10-21 11:47:34 +02:00
63bdbe688f 🤓 Can run jobs 2024-10-21 11:35:42 +02:00
849ce2858f :clown-face: fixed a test 2024-10-21 10:17:40 +02:00
5a9e4d675b 2024-10-18 10:14:00 +02:00
7f0191098e 📬 nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare nightmare 2024-10-16 16:54:24 +02:00
46154359eb 📬 help 2024-10-16 16:15:38 +02:00
775013b825 A6 2024-10-16 13:08:00 +02:00
b0b087648c 2024-10-11 16:30:03 +02:00
ff512028f3 2024-10-11 16:22:47 +02:00
10 changed files with 670 additions and 1 deletions

BIN
a5/DekensGadePrehn-a5.pdf Normal file

Binary file not shown.

BIN
a5/DekensGadePrehn-a5.zip Normal file

Binary file not shown.

View File

@ -60,7 +60,7 @@ printExp (Let v e1 e2) =
printExp (Lambda v body) =
parens $ "\\" ++ v ++ " -> " ++ printExp body
printExp (Apply x y) =
printExp x ++ " (" ++ printExp y ++ ")"
parens $ printExp x ++ " " ++ printExp y
printExp (TryCatch x y) =
parens $ "try " ++ printExp x ++ " catch " ++ printExp y

BIN
a6/DekensGadePrehn-a6.zip Normal file

Binary file not shown.

30
a6/a6.cabal Normal file
View File

@ -0,0 +1,30 @@
cabal-version: 3.0
name: a6
version: 1.0.0.0
build-type: Simple
common common
default-language: Haskell2010
ghc-options: -Wall
library
import: common
hs-source-dirs: src
build-depends:
base
, tasty
, tasty-hunit
, clock
exposed-modules:
SPC
SPC_Tests
GenServer
test-suite a6-tests
import: common
type: exitcode-stdio-1.0
main-is: runtests.hs
build-depends:
base
, tasty
, a6

BIN
a6/a6.pdf Normal file

Binary file not shown.

5
a6/runtests.hs Normal file
View File

@ -0,0 +1,5 @@
import qualified SPC_Tests
import Test.Tasty (defaultMain)
main :: IO ()
main = defaultMain SPC_Tests.tests

45
a6/src/GenServer.hs Normal file
View File

@ -0,0 +1,45 @@
module GenServer
( Chan,
Server(..),
receive,
send,
sendTo,
spawn,
ReplyChan,
requestReply,
reply,
)
where
import Control.Concurrent (Chan)
import qualified Control.Concurrent as CC
data Server msg = Server CC.ThreadId (Chan msg)
data ReplyChan a = ReplyChan (Chan a)
send :: Chan a -> a -> IO ()
send chan msg =
CC.writeChan chan msg
sendTo :: Server a -> a -> IO ()
sendTo (Server _tid input) msg =
send input msg
receive :: Chan a -> IO a
receive = CC.readChan
spawn :: (Chan a -> IO ()) -> IO (Server a)
spawn server = do
input <- CC.newChan
tid <- CC.forkIO $ server input
pure $ Server tid input
requestReply :: Server a -> (ReplyChan b -> a) -> IO b
requestReply serv con = do
reply_chan <- CC.newChan
sendTo serv $ con $ ReplyChan reply_chan
receive reply_chan
reply :: ReplyChan a -> a -> IO ()
reply (ReplyChan chan) x = send chan x

409
a6/src/SPC.hs Normal file
View File

@ -0,0 +1,409 @@
{-# LANGUAGE InstanceSigs #-}
module SPC
( -- * SPC startup
SPC,
startSPC,
-- * Job functions
Job (..),
JobId,
JobStatus (..),
JobDoneReason (..),
jobAdd,
jobStatus,
jobWait,
jobCancel,
-- * Worker functions
WorkerName,
workerAdd,
workerStop,
-- debugState,
-- SPCState (..)
)
where
import Control.Concurrent
( forkIO,
killThread,
threadDelay,
ThreadId
)
import Control.Exception (SomeException, catch)
import Control.Monad (ap, forever, liftM, void, filterM, when)
import GenServer
import System.Clock.Seconds (Clock (Monotonic), Seconds, getTime)
-- First some general utility functions.
-- | Retrieve Unix time using a monotonic clock. You cannot use this
-- to measure the actual world time, but you can use it to measure
-- elapsed time.
getSeconds :: IO Seconds
getSeconds = getTime Monotonic
-- | Remove mapping from association list.
removeAssoc :: (Eq k) => k -> [(k, v)] -> [(k, v)]
removeAssoc needle ((k, v) : kvs) =
if k == needle
then kvs
else (k, v) : removeAssoc needle kvs
removeAssoc _ [] = []
-- Then the definition of the glorious SPC.
-- | A job that is to be enqueued in the glorious SPC.
data Job = Job
{ -- | The IO action that comprises the actual action of the job.
jobAction :: IO (),
-- | The maximum allowed runtime of the job, counting from when
-- the job begins executing (not when it is enqueued).
jobMaxSeconds :: Int
}
-- | A unique identifier of a job that has been enqueued.
newtype JobId = JobId Int
deriving (Eq, Ord, Show)
-- | How a job finished.
data JobDoneReason
= -- | Normal termination.
Done
| -- | The job was killed because it ran for too long.
DoneTimeout
| -- | The job was explicitly cancelled, or the worker
-- it was running on was stopped.
DoneCancelled
| -- | The job crashed due to an exception.
DoneCrashed
deriving (Eq, Ord, Show)
-- | The status of a job.
data JobStatus
= -- | The job is done and this is why.
JobDone JobDoneReason
| -- | The job is still running.
JobRunning
| -- | The job is enqueued, but is waiting for an idle worker.
JobPending
| -- | A job with this ID is not known to this SPC instance.
JobUnknown
deriving (Eq, Ord, Show)
-- | A worker decides its own human-readable name. This is useful for
-- debugging.
type WorkerName = String
-- | Messages sent to workers. These are sent both by SPC and by
-- processes spawned by the workes.
data WorkerMsg
= -- | New job time
MsgStartJob (IO ()) JobId (ReplyChan ThreadId)
| -- | Remove worker
MsgKill
-- Messages sent to SPC.
data SPCMsg
= -- | Add the job, and reply with the job ID.
MsgJobAdd Job (ReplyChan JobId)
| -- | Cancel the given job.
MsgJobCancel JobId
| -- | Immediately reply the status of the job.
MsgJobStatus JobId (ReplyChan JobStatus)
| -- | Reply when the job is done.
MsgJobWait JobId (ReplyChan JobDoneReason)
| -- | Some time has passed.
MsgTick
| -- | Ask if worker exists
MsgWorkerExists WorkerName (ReplyChan Bool)
| -- | Add a new worker
MsgAddWorker WorkerName Worker
| -- | Worker finished job
MsgJobDone JobId
| -- | Crashed
MsgJobCrashed JobId
| -- | Remove worker (workplace accident)
MsgRemoveWorker WorkerName
-- | A handle to the SPC instance.
data SPC = SPC (Server SPCMsg)
-- | A handle to a worker.
data Worker = Worker (Server WorkerMsg)
-- | The central state. Must be protected from the bourgeoisie.
data SPCState = SPCState
{ spcJobsPending :: [(JobId, Job)],
spcJobsRunning :: [(JobId, (WorkerName, Seconds, ThreadId))],
spcJobsDone :: [(JobId, JobDoneReason)],
spcJobCounter :: JobId,
spcWorkers :: [(WorkerName, Worker)],
spcWaiting :: [(JobId, (ReplyChan JobDoneReason))]
}
-- | The monad in which the main SPC thread runs. This is a state
-- monad with support for IO.
newtype SPCM a = SPCM (SPCState -> IO (a, SPCState))
instance Functor SPCM where
fmap :: (a -> b) -> SPCM a -> SPCM b
fmap = liftM
instance Applicative SPCM where
pure x = SPCM $ \state -> pure (x, state)
(<*>) = ap
instance Monad SPCM where
SPCM m >>= f = SPCM $ \state -> do
(x, state') <- m state
let SPCM f' = f x
f' state'
-- | Retrieve the state.
get :: SPCM SPCState
get = SPCM $ \state -> pure (state, state)
-- | Overwrite the state.
put :: SPCState -> SPCM ()
put state = SPCM $ \_ -> pure ((), state)
-- | Lift an 'IO' action into 'SPCM'.
io :: IO a -> SPCM a
io m = SPCM $ \state -> do
x <- m
pure (x, state)
-- | Run the SPCM monad.
runSPCM :: SPCState -> SPCM a -> IO a
runSPCM state (SPCM f) = fst <$> f state
workerIsIdle :: (WorkerName, Worker) -> SPCM Bool
workerIsIdle (name, _) = do
state <- get
pure (all (\(_, (w,_,_)) -> w /= name) (spcJobsRunning state))
checkJobTimeout :: (JobId, (WorkerName, Seconds, ThreadId)) -> SPCM ()
checkJobTimeout (jobid, (_, deadline, t)) = do
now <- io $ getSeconds
when (now >= deadline) $ do
io $ killThread t
jobDone jobid DoneTimeout
checkTimeouts :: SPCM ()
checkTimeouts = do
state <- get
mapM_ checkJobTimeout (spcJobsRunning state)
getIdleWorkers :: SPCM [(WorkerName, Worker)]
getIdleWorkers = do
state <- get
filterM (workerIsIdle) (spcWorkers state)
schedule :: SPCM ()
schedule = do
state <- get
case spcJobsPending state of
((jobid, job) : jobs) -> do
workers <- getIdleWorkers
case workers of
(workerName,worker):_ -> do
w <- (\(Worker w) -> pure w) worker
threadId <- io $ requestReply w (MsgStartJob (jobAction job) jobid)
now <- io $ getSeconds
let deadline = now + fromIntegral (jobMaxSeconds job)
put $
state
{ spcJobsRunning = (jobid, (workerName, deadline, threadId)) : spcJobsRunning state,
spcJobsPending = jobs
}
_ -> pure ()
_ -> pure ()
handleMsg :: Chan SPCMsg -> SPCM ()
handleMsg c = do
checkTimeouts
schedule
msg <- io $ receive c
case msg of
MsgJobAdd job rsvp -> do
state <- get
let JobId jobid = spcJobCounter state
put $
state
{ spcJobsPending =
(spcJobCounter state, job) : spcJobsPending state,
spcJobCounter = JobId $ succ jobid
}
io $ reply rsvp $ JobId jobid
MsgJobStatus jobid rsvp -> do
state <- get
io $ reply rsvp $ case ( lookup jobid $ spcJobsPending state,
lookup jobid $ spcJobsRunning state,
lookup jobid $ spcJobsDone state
) of
(Just _, _, _) -> JobPending
(_, Just _, _) -> JobRunning
(_, _, Just r) -> JobDone r
_ -> JobUnknown
MsgWorkerExists name rsvp -> do
state <- get
io $ reply rsvp $ case (lookup name $ spcWorkers state) of
Just _ -> True
_ -> False
MsgAddWorker name worker -> do
state <- get
put $
state
{ spcWorkers =
(name, worker) : spcWorkers state
}
MsgJobDone jobid -> do
state <- get
case (lookup jobid $ spcJobsRunning state) of
Just (_, _, _) -> do
jobDone jobid Done
Nothing -> pure ()
MsgJobWait jobid rsvp -> do
state <- get
case lookup jobid $ spcJobsDone state of
Just reason -> do
io $ reply rsvp $ reason
Nothing ->
put $ state {spcWaiting = (jobid, rsvp) : spcWaiting state}
MsgJobCancel jobid -> do
state <- get
case (lookup jobid $ spcJobsRunning state, lookup jobid $ spcJobsPending state) of
(Just (_,_,t), _) -> do
io $ killThread t
jobDone jobid DoneCancelled
(_, Just _) -> do
put $
state
{ spcJobsPending = removeAssoc jobid $ spcJobsPending state,
spcJobsDone = (jobid, DoneCancelled) : spcJobsDone state
}
_ -> pure ()
MsgJobCrashed jobid -> do
state <- get
case (lookup jobid $ spcJobsRunning state) of
Just (_, _, _) -> do
jobDone jobid DoneCrashed
Nothing -> pure ()
MsgRemoveWorker workerName -> do
state <- get
case (lookup workerName $ spcWorkers state) of
Just (Worker (Server threadId _)) -> do
jobs <- pure $ map (\(jobid, (name,_,t)) -> (name,(jobid,t))) $ spcJobsRunning state
case (lookup workerName jobs) of
Just (jobid,t) -> do
io $ killThread t
jobDone jobid DoneCancelled
Nothing -> pure ()
state2 <- get
put $ state2 {spcWorkers = removeAssoc workerName $ spcWorkers state2}
io $ killThread threadId
_ -> pure ()
_ -> pure ()
startSPC :: IO SPC
startSPC = do
let initial_state =
SPCState
{ spcJobCounter = JobId 0,
spcJobsPending = [],
spcJobsRunning = [],
spcJobsDone = [],
spcWorkers = [],
spcWaiting = []
}
c <- spawn $ \c -> runSPCM initial_state $ forever $ handleMsg c
void $ spawn $ timer c
pure $ SPC c
where
timer c _ = forever $ do
threadDelay 1000000 -- 1 second
sendTo c MsgTick
jobDone :: JobId -> JobDoneReason -> SPCM ()
jobDone jobid reason = do
state <- get
case lookup jobid $ spcJobsDone state of
Just _ ->
-- We already know this job is done.
pure ()
Nothing -> do
case (lookup jobid (spcWaiting state)) of
Just rsvp -> io $ reply rsvp $ reason
_ -> pure ()
put $
state
{ spcJobsRunning =
removeAssoc jobid $ spcJobsRunning state,
spcJobsDone =
(jobid, reason) : spcJobsDone state
}
-- | Add a job for scheduling.
jobAdd :: SPC -> Job -> IO JobId
jobAdd (SPC c) job =
requestReply c $ MsgJobAdd job
-- | Asynchronously query the job status.
jobStatus :: SPC -> JobId -> IO JobStatus
jobStatus (SPC c) jobid =
requestReply c $ MsgJobStatus jobid
-- | Synchronously block until job is done and return the reason.
jobWait :: SPC -> JobId -> IO JobDoneReason
jobWait (SPC c) jobid =
requestReply c $ MsgJobWait jobid
-- | Asynchronously cancel a job.
jobCancel :: SPC -> JobId -> IO ()
jobCancel (SPC c) jobid =
sendTo c $ MsgJobCancel jobid
-- debugState :: SPC -> IO SPCState
-- debugState (SPC c) =
-- requestReply c $ MsgDebug
-- | Add a new worker with this name. Fails with 'Left' if a worker
-- with that name already exists.
workerAdd :: SPC -> WorkerName -> IO (Either String Worker)
workerAdd (SPC c) name = do
exists <- requestReply c $ MsgWorkerExists name
if exists
then pure $ Left "Worker with given name already exist"
else do
worker <- workerSpawn name c
sendTo c $ MsgAddWorker name worker
pure $ Right worker
workerSpawn :: WorkerName -> (Server SPCMsg) -> IO Worker
workerSpawn name c = do
w <- spawn $ workerLoop name c
pure $ Worker w
workerLoop :: WorkerName -> (Server SPCMsg) -> Chan WorkerMsg -> IO ()
workerLoop name c m = forever $ do
msg <- receive m
case msg of
-- stuff happening here
MsgStartJob action jobid rsvp -> do
t <- forkIO $ do
let doJob = do
action
sendTo c $ MsgJobDone jobid
onException :: SomeException -> IO ()
onException _ =
sendTo c $ MsgJobCrashed jobid
doJob `catch` onException
reply rsvp t
MsgKill -> sendTo c $ MsgRemoveWorker name
-- | Shut down a running worker. No effect if the worker is already
-- terminated.
workerStop :: Worker -> IO ()
workerStop (Worker w) = sendTo w MsgKill

180
a6/src/SPC_Tests.hs Normal file
View File

@ -0,0 +1,180 @@
module SPC_Tests (tests) where
import Control.Concurrent (threadDelay)
import Data.IORef
import SPC
import Test.Tasty (TestTree, localOption, mkTimeout, testGroup)
import Test.Tasty.HUnit (testCase, (@?=))
import Data.Either (isRight)
tests :: TestTree
tests =
localOption (mkTimeout 3000000) $
testGroup
"SPC (core)"
[
testCase "workerAdd" $ do
spc <- startSPC
w <- workerAdd spc "R2-D2"
isRight w @?= True,
testCase "workerAdd (2)" $ do
spc <- startSPC
w1 <- workerAdd spc "MSE-6"
isRight w1 @?= True
w2 <- workerAdd spc "GNK"
isRight w2 @?= True,
testCase "workerAdd (fail)" $ do
spc <- startSPC
w1 <- workerAdd spc "BD-1"
isRight w1 @?= True
w2 <- workerAdd spc "BD-1"
isRight w2 @?= False,
testCase "Running a job" $ do
ref <- newIORef False
spc <- startSPC
w <- workerAdd spc "R5-D4"
isRight w @?= True
j <- jobAdd spc $ Job (writeIORef ref True) 1
r <- jobWait spc j
r @?= Done
x <- readIORef ref
x @?= True,
testCase "Adding job before worker" $ do
ref <- newIORef False
spc <- startSPC
j <- jobAdd spc $ Job (writeIORef ref True) 1
w <- workerAdd spc "R5-D4"
isRight w @?= True
r <- jobWait spc j
r @?= Done
x <- readIORef ref
x @?= True,
testCase "Running two jobs" $ do
ref <- newIORef (0::Int)
spc <- startSPC
w <- workerAdd spc "K-2SO"
isRight w @?= True
j1 <- jobAdd spc $ Job (writeIORef ref 1) 1
r1 <- jobWait spc j1
r1 @?= Done
x1 <- readIORef ref
x1 @?= 1
j2 <- jobAdd spc $ Job (writeIORef ref 2) 1
r2 <- jobWait spc j2
r2 @?= Done
x2 <- readIORef ref
x2 @?= 2,
testCase "Canceling job (pending)" $ do
spc <- startSPC
j <- jobAdd spc $ Job (pure ()) 1
jobCancel spc j
r <- jobStatus spc j
r @?= JobDone DoneCancelled,
testCase "Canceling job (running)" $ do
spc <- startSPC
w <- workerAdd spc "IG-88"
isRight w @?= True
j <- jobAdd spc $ Job (threadDelay 2000000) 2
jobCancel spc j
r <- jobStatus spc j
r @?= JobDone DoneCancelled,
testCase "Canceling job (running) (new job)" $ do
ref <- newIORef False
spc <- startSPC
w <- workerAdd spc "C-3PO"
isRight w @?= True
j1 <- jobAdd spc $ Job (threadDelay 2000000) 2
jobCancel spc j1
r1 <- jobStatus spc j1
r1 @?= JobDone DoneCancelled
-- job has been cancelled. Starting new job
j2 <- jobAdd spc $ Job (writeIORef ref True) 1
r2 <- jobWait spc j2
r2 @?= Done
x <- readIORef ref
x @?= True,
testCase "Timeout" $ do
spc <- startSPC
w <- workerAdd spc "L3-37"
isRight w @?= True
j <- jobAdd spc $ Job (threadDelay 2000000) 1
r <- jobWait spc j
r @?= DoneTimeout,
testCase "Timeout (2 jobs)" $ do
ref <- newIORef False
spc <- startSPC
w <- workerAdd spc "General Kalani"
isRight w @?= True
j1 <- jobAdd spc $ Job (threadDelay 2000000) 1
j2 <- jobAdd spc $ Job (writeIORef ref True) 1
r1 <- jobWait spc j1
r1 @?= DoneTimeout
r2 <- jobWait spc j2
r2 @?= Done
x <- readIORef ref
x @?= True,
testCase "Crash" $ do
ref <- newIORef False
spc <- startSPC
w <- workerAdd spc "C1-10P"
isRight w @?= True
j1 <- jobAdd spc $ Job (error "boom") 1
r1 <- jobWait spc j1
r1 @?= DoneCrashed
-- Ensure new jobs can still work.
j2 <- jobAdd spc $ Job (writeIORef ref True) 1
r2 <- jobWait spc j2
r2 @?= Done
x <- readIORef ref
x @?= True,
testCase "Remove worker" $ do
spc <- startSPC
w1 <- workerAdd spc "D-O"
isRight w1 @?= True
case w1 of
(Right worker) -> do
w2 <- workerAdd spc "D-O" -- Can't make another worker with same name yet
isRight w2 @?= False
j <- jobAdd spc $ Job (threadDelay 1000000) 1
workerStop worker
threadDelay 100
r <- jobStatus spc j
r @?= JobDone DoneCancelled
w3 <- workerAdd spc "D-O" -- But we can make one now
isRight w3 @?= True
_ -> False @?= True
]