Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: node_js
dist: trusty
sudo: required
node_js: 6
node_js: 8
install:
- npm install -g bower
- npm install
Expand Down
7 changes: 4 additions & 3 deletions bower.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
"output"
],
"dependencies": {
"purescript-aff": "^4.0.2"
"purescript-aff": "^5.0.0",
"purescript-avar": "^3.0.0"
},
"devDependencies": {
"purescript-psci-support": "^3.0.0",
"purescript-test-unit": "^13.0.0"
"purescript-psci-support": "^4.0.0",
"purescript-test-unit": "^14.0.0"
}
}
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
"test": "pulp test"
},
"devDependencies": {
"pulp": "^12.0.1",
"purescript": "^0.11.7",
"pulp": "^12.0.0",
"purescript": "^0.12.0",
"purescript-psa": "^0.6.0",
"rimraf": "^2.6.1"
"rimraf": "^2.6.2"
}
}
61 changes: 31 additions & 30 deletions src/Concurrent/BoundedQueue.purs
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,53 @@ module Concurrent.BoundedQueue

import Prelude

import Control.Monad.Aff (Aff)
import Control.Monad.Aff.AVar as AV
import Data.Array (unsafeIndex)
import Data.Maybe (Maybe(..))
import Data.Unfoldable (replicateA)
import Effect.Aff (Aff)
import Effect.Aff.AVar (AVar)
import Effect.Aff.AVar as AVar
import Partial.Unsafe (unsafePartial)

newtype BoundedQueue a =
BoundedQueue
{ size ∷ Int
, contents ∷ Array (AV.AVar a)
, readPos ∷ AV.AVar Int
, writePos ∷ AV.AVar Int
, contents ∷ Array (AVar a)
, readPos ∷ AVar Int
, writePos ∷ AVar Int
}

-- | Creates a new `BoundedQueue` with the given capacity,
new ∷ ∀ a eff. Int → Aff (avar ∷ AV.AVAR | eff) (BoundedQueue a)
new ∷ ∀ a. Int → Aff (BoundedQueue a)
new size = do
contents ← replicateA size AV.makeEmptyVar
readPos ← AV.makeVar 0
writePos ← AV.makeVar 0
contents ← replicateA size AVar.empty
readPos ← AVar.new 0
writePos ← AVar.new 0
pure (BoundedQueue { size, contents, readPos, writePos })

-- | Writes an element to the given queue. Will block if the queue is full until
-- | someone reads from it.
write ∷ ∀ a eff. BoundedQueue a → a → Aff (avar ∷ AV.AVAR | eff) Unit
write ∷ ∀ a. BoundedQueue a → a → Aff Unit
write (BoundedQueue q) a = do
w ← AV.takeVar q.writePos
AV.putVar a (unsafePartial unsafeIndex q.contents w)
AV.putVar ((w + 1) `mod` q.size) q.writePos
w ← AVar.take q.writePos
AVar.put a (unsafePartial unsafeIndex q.contents w)
AVar.put ((w + 1) `mod` q.size) q.writePos

-- | Reads an element from the given queue, will block if the queue is empty,
-- | until someone writes to it.
read ∷ ∀ a eff. BoundedQueue a → Aff (avar ∷ AV.AVAR | eff) a
read ∷ ∀ a. BoundedQueue a → Aff a
read (BoundedQueue q) = do
r ← AV.takeVar q.readPos
v ← AV.takeVar (unsafePartial unsafeIndex q.contents r)
AV.putVar ((r + 1) `mod` q.size) q.readPos
r ← AVar.take q.readPos
v ← AVar.take (unsafePartial unsafeIndex q.contents r)
AVar.put ((r + 1) `mod` q.size) q.readPos
pure v

-- | Checks whether the given queue is empty. Never blocks.
isEmpty ∷ ∀ a eff. BoundedQueue a → Aff (avar ∷ AV.AVAR | eff) Boolean
isEmpty ∷ ∀ a. BoundedQueue a → Aff Boolean
isEmpty (BoundedQueue q) = do
AV.tryReadVar q.readPos >>= case _ of
AVar.tryRead q.readPos >>= case _ of
Nothing → pure true
Just r → AV.tryReadVar (unsafePartial unsafeIndex q.contents r) <#> case _ of
Just r → AVar.tryRead (unsafePartial unsafeIndex q.contents r) <#> case _ of
Nothing → true
Just _ → false

Expand All @@ -68,29 +69,29 @@ isEmpty (BoundedQueue q) = do
-- |
-- | *Careful!* If other readers are blocked on the queue `tryRead` will also
-- | block.
tryRead ∷ ∀ a eff. BoundedQueue a → Aff (avar ∷ AV.AVAR | eff) (Maybe a)
tryRead ∷ ∀ a. BoundedQueue a → Aff (Maybe a)
tryRead (BoundedQueue q) = do
r ← AV.takeVar q.readPos
AV.tryTakeVar (unsafePartial unsafeIndex q.contents r) >>= case _ of
r ← AVar.take q.readPos
AVar.tryTake (unsafePartial unsafeIndex q.contents r) >>= case _ of
Just v → do
AV.putVar ((r + 1) `mod` q.size) q.readPos
AVar.put ((r + 1) `mod` q.size) q.readPos
pure (Just v)
Nothing → do
AV.putVar r q.readPos
AVar.put r q.readPos
pure Nothing

-- | Attempts to write an element into the given queue. If the queue is full,
-- | returns `false` otherwise `true`.
-- |
-- | *Careful!* If other writers are blocked on the queue `tryWrite` will also
-- | block.
tryWrite ∷ ∀ a eff. BoundedQueue a → a → Aff (avar ∷ AV.AVAR | eff) Boolean
tryWrite ∷ ∀ a. BoundedQueue a → a → Aff Boolean
tryWrite (BoundedQueue q) a = do
w ← AV.takeVar q.writePos
AV.tryPutVar a (unsafePartial unsafeIndex q.contents w) >>= if _
w ← AVar.take q.writePos
AVar.tryPut a (unsafePartial unsafeIndex q.contents w) >>= if _
then do
AV.putVar ((w + 1) `mod` q.size) q.writePos
AVar.put ((w + 1) `mod` q.size) q.writePos
pure true
else do
AV.putVar w q.writePos
AVar.put w q.writePos
pure false
41 changes: 21 additions & 20 deletions src/Concurrent/Queue.purs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ module Concurrent.Queue

import Prelude

import Control.Monad.Aff (Aff)
import Control.Monad.Aff.AVar (AVAR, AVar, makeEmptyVar, makeVar, putVar, readVar, takeVar, tryReadVar)
import Data.Maybe (Maybe(..))
import Effect.Aff (Aff)
import Effect.Aff.AVar (AVar)
import Effect.Aff.AVar as AVar

-- | An unbounded Queue fit for concurrent access.
newtype Queue a = Queue
Expand All @@ -23,42 +24,42 @@ type Stream a = AVar (QItem a)
data QItem a = QItem a (Stream a)

-- | Creates a new `Queue`.
new ∷ ∀ a e. Aff (avar ∷ AVAR | e) (Queue a)
new ∷ ∀ a. Aff (Queue a)
new = do
hole ← makeEmptyVar
readEnd ← makeVar hole
writeEnd ← makeVar hole
hole ← AVar.empty
readEnd ← AVar.new hole
writeEnd ← AVar.new hole
pure (Queue { readEnd, writeEnd })

-- | Writes a new value into the queue
write ∷ ∀ a e. Queue a → a → Aff (avar ∷ AVAR | e) Unit
write ∷ ∀ a. Queue a → a → Aff Unit
write (Queue q) a = do
newHole ← makeEmptyVar
oldHole ← takeVar q.writeEnd
putVar (QItem a newHole) oldHole
putVar newHole q.writeEnd
newHole ← AVar.empty
oldHole ← AVar.take q.writeEnd
AVar.put (QItem a newHole) oldHole
AVar.put newHole q.writeEnd

-- | Reads a value from the queue. Blocks if the queue is empty, and resumes
-- | when it has been written to.
read ∷ ∀ a e. Queue a → Aff (avar ∷ AVAR | e) a
read ∷ ∀ a. Queue a → Aff a
read (Queue q) = do
readEnd ← takeVar q.readEnd
QItem a newRead ← readVar readEnd
putVar newRead q.readEnd
readEnd ← AVar.take q.readEnd
QItem a newRead ← AVar.read readEnd
AVar.put newRead q.readEnd
pure a

-- | Attempts to read a value from the queue. Fails with `Nothing` if the queue
-- | is empty.
-- |
-- | *CAREFUL!* This will block if other readers are blocked on the
-- | queue.
tryRead ∷ ∀ a e. Queue a → Aff (avar ∷ AVAR | e) (Maybe a)
tryRead ∷ ∀ a. Queue a → Aff (Maybe a)
tryRead (Queue q) = do
readEnd ← takeVar q.readEnd
tryReadVar readEnd >>= case _ of
readEnd ← AVar.take q.readEnd
AVar.tryRead readEnd >>= case _ of
Just (QItem a newRead) → do
putVar newRead q.readEnd
AVar.put newRead q.readEnd
pure (Just a)
Nothing → do
putVar readEnd q.readEnd
AVar.put readEnd q.readEnd
pure Nothing
9 changes: 4 additions & 5 deletions test/BoundedQueue.purs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ import Prelude

import Concurrent.BoundedQueue as BQ
import Control.Alt ((<|>))
import Control.Monad.Aff (Aff, Milliseconds(..), delay, forkAff, parallel, sequential)
import Control.Monad.Aff.AVar (AVAR)
import Data.Either (Either(..), isLeft, isRight)
import Data.Int (toNumber)
import Data.Maybe (Maybe(..), isNothing)
import Effect.Aff (Aff, Milliseconds(..), delay, forkAff, parallel, sequential)
import Test.Unit (TestSuite, suite, test)
import Test.Unit.Assert as Assert

race ∷ ∀ a b e. Aff e a → Aff e b → Aff e (Either a b)
race ∷ ∀ a b. Aff a → Aff b → Aff (Either a b)
race a b = sequential ((parallel (map Left a)) <|> (parallel (map Right b)))

delayMs ∷ ∀ e. Int → Aff e Unit
delayMs ∷ Int → Aff Unit
delayMs = delay <<< Milliseconds <<< toNumber

boundedQueueSuite ∷ ∀ e. TestSuite (avar ∷ AVAR | e)
boundedQueueSuite ∷ TestSuite
boundedQueueSuite = do
suite "Simple operations" do
test "inserting and popping elements" do
Expand Down
7 changes: 2 additions & 5 deletions test/Main.purs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ module Test.Main where

import Prelude

import Control.Monad.Aff.AVar (AVAR)
import Control.Monad.Aff.Console (CONSOLE)
import Control.Monad.Eff (Eff)
import Effect (Effect)
import Test.BoundedQueue (boundedQueueSuite)
import Test.Queue (queueSuite)
import Test.Unit (suite)
import Test.Unit.Console (TESTOUTPUT)
import Test.Unit.Main (runTest)

main ∷ ∀ e. Eff (console ∷ CONSOLE, testOutput ∷ TESTOUTPUT, avar ∷ AVAR | e) Unit
main ∷ Effect Unit
main = runTest do
suite "Queue" queueSuite
suite "BoundedQueue" boundedQueueSuite
9 changes: 4 additions & 5 deletions test/Queue.purs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ import Prelude

import Concurrent.Queue as Q
import Control.Alt ((<|>))
import Control.Monad.Aff (Aff, Milliseconds(..), delay, forkAff, parallel, sequential)
import Control.Monad.Aff.AVar (AVAR)
import Data.Either (Either(..), isLeft, isRight)
import Data.Int (toNumber)
import Data.Maybe (Maybe(..), isNothing)
import Effect.Aff (Aff, Milliseconds(..), delay, forkAff, parallel, sequential)
import Test.Unit (TestSuite, suite, test)
import Test.Unit.Assert as Assert

race ∷ ∀ a b e. Aff e a → Aff e b → Aff e (Either a b)
race ∷ ∀ a b. Aff a → Aff b → Aff (Either a b)
race a b = sequential ((parallel (map Left a)) <|> (parallel (map Right b)))

delayMs ∷ ∀ e. Int → Aff e Unit
delayMs ∷ Int → Aff Unit
delayMs = delay <<< Milliseconds <<< toNumber

queueSuite ∷ ∀ e. TestSuite (avar ∷ AVAR | e)
queueSuite ∷ TestSuite
queueSuite = do
suite "Simple operations" do
test "inserting and popping elements" do
Expand Down