Skip to content

Commit 2d9dfe3

Browse files
author
Aleksandar Prokopec
committed
Add parallel Ctrie parallel collection.
1 parent c3d19c5 commit 2d9dfe3

File tree

5 files changed

+151
-10
lines changed

5 files changed

+151
-10
lines changed

src/library/scala/collection/mutable/Ctrie.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -844,7 +844,7 @@ object Ctrie extends MutableMapFactory[Ctrie] {
844844
}
845845

846846

847-
private[mutable] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
847+
private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
848848
var stack = new Array[Array[BasicNode]](7)
849849
var stackpos = new Array[Int](7)
850850
var depth = -1
@@ -910,10 +910,12 @@ private[mutable] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean =
910910
}
911911
} else current = null
912912

913+
protected def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_ct, _mustInit)
914+
913915
/** Returns a sequence of iterators over subsets of this iterator.
914916
* It's used to ease the implementation of splitters for a parallel version of the Ctrie.
915917
*/
916-
protected def subdivide: Seq[Iterator[(K, V)]] = if (subiter ne null) {
918+
protected def subdivide(): Seq[Iterator[(K, V)]] = if (subiter ne null) {
917919
// the case where an LNode is being iterated
918920
val it = subiter
919921
subiter = null
@@ -927,7 +929,7 @@ private[mutable] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean =
927929
val (arr1, arr2) = stack(d).drop(stackpos(d) + 1).splitAt(rem / 2)
928930
stack(d) = arr1
929931
stackpos(d) = -1
930-
val it = new CtrieIterator[K, V](ct, false)
932+
val it = newIterator(ct, false)
931933
it.stack(0) = arr2
932934
it.stackpos(0) = -1
933935
it.depth = 0

src/library/scala/collection/parallel/ParIterableLike.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
451451

452452
reduce((x, y) => if (cmp.lteq(f(x), f(y))) x else y)
453453
}
454-
454+
455455
def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
456456
executeAndWaitResult(new Map[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result })
457457
} else seq.map(f)(bf2seq(bf))
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/* __ *\
2+
** ________ ___ / / ___ Scala API **
3+
** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
4+
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
5+
** /____/\___/_/ |_/____/_/ | | **
6+
** |/ **
7+
\* */
8+
9+
package scala.collection.parallel.mutable
10+
11+
12+
13+
import scala.collection.generic._
14+
import scala.collection.parallel.Combiner
15+
import scala.collection.parallel.IterableSplitter
16+
import scala.collection.mutable.Ctrie
17+
import scala.collection.mutable.CtrieIterator
18+
19+
20+
21+
/** Parallel Ctrie collection.
22+
*
23+
* It has its bulk operations parallelized, but uses the snapshot operation
24+
* to create the splitter. This means that parallel bulk operations can be
25+
* called concurrently with the modifications.
26+
*
27+
* @author Aleksandar Prokopec
28+
* @since 2.10
29+
*/
30+
final class ParCtrie[K, V] private[mutable] (private val ctrie: Ctrie[K, V])
31+
extends ParMap[K, V]
32+
with GenericParMapTemplate[K, V, ParCtrie]
33+
with ParMapLike[K, V, ParCtrie[K, V], Ctrie[K, V]]
34+
with ParCtrieCombiner[K, V]
35+
with Serializable
36+
{
37+
38+
def this() = this(new Ctrie)
39+
40+
override def mapCompanion: GenericParMapCompanion[ParCtrie] = ParCtrie
41+
42+
override def empty: ParCtrie[K, V] = ParCtrie.empty
43+
44+
protected[this] override def newCombiner = ParCtrie.newCombiner
45+
46+
override def seq = ctrie
47+
48+
def splitter = new ParCtrieSplitter(ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true)
49+
50+
override def size = ctrie.size
51+
52+
override def clear() = ctrie.clear()
53+
54+
def result = this
55+
56+
def get(key: K): Option[V] = ctrie.get(key)
57+
58+
def put(key: K, value: V): Option[V] = ctrie.put(key, value)
59+
60+
def update(key: K, value: V): Unit = ctrie.update(key, value)
61+
62+
def remove(key: K): Option[V] = ctrie.remove(key)
63+
64+
def +=(kv: (K, V)): this.type = {
65+
ctrie.+=(kv)
66+
this
67+
}
68+
69+
def -=(key: K): this.type = {
70+
ctrie.-=(key)
71+
this
72+
}
73+
74+
override def stringPrefix = "ParCtrie"
75+
76+
}
77+
78+
79+
private[collection] class ParCtrieSplitter[K, V](ct: Ctrie[K, V], mustInit: Boolean)
80+
extends CtrieIterator[K, V](ct, mustInit)
81+
with IterableSplitter[(K, V)]
82+
{
83+
// only evaluated if `remaining` is invoked (which is not used by most tasks)
84+
lazy val totalsize = ct.iterator.size // TODO improve to lazily compute sizes
85+
var iterated = 0
86+
87+
protected override def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_ct, _mustInit)
88+
89+
def dup = null // TODO necessary for views
90+
91+
override def next() = {
92+
iterated += 1
93+
super.next()
94+
}
95+
96+
def split: Seq[IterableSplitter[(K, V)]] = subdivide().asInstanceOf[Seq[IterableSplitter[(K, V)]]]
97+
98+
def remaining: Int = totalsize - iterated
99+
}
100+
101+
102+
/** Only used within the `ParCtrie`. */
103+
private[mutable] trait ParCtrieCombiner[K, V] extends Combiner[(K, V), ParCtrie[K, V]] {
104+
105+
def combine[N <: (K, V), NewTo >: ParCtrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this eq other) this else {
106+
throw new UnsupportedOperationException("This shouldn't have been called in the first place.")
107+
108+
val thiz = this.asInstanceOf[ParCtrie[K, V]]
109+
val that = other.asInstanceOf[ParCtrie[K, V]]
110+
val result = new ParCtrie[K, V]
111+
112+
result ++= thiz.iterator
113+
result ++= that.iterator
114+
115+
result
116+
}
117+
118+
override def canBeShared = true
119+
120+
}
121+
122+
123+
object ParCtrie extends ParMapFactory[ParCtrie] {
124+
125+
def empty[K, V]: ParCtrie[K, V] = new ParCtrie[K, V]
126+
127+
def newCombiner[K, V]: Combiner[(K, V), ParCtrie[K, V]] = new ParCtrie[K, V]
128+
129+
implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParCtrie[K, V]] = new CanCombineFromMap[K, V]
130+
131+
}
132+
133+
134+
135+
136+
137+
138+
139+

src/library/scala/collection/parallel/mutable/ParHashMap.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ package mutable
1212

1313

1414

15-
1615
import collection.generic._
1716
import collection.mutable.DefaultEntry
1817
import collection.mutable.HashEntry

src/library/scala/collection/parallel/package.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,22 +196,23 @@ package parallel {
196196
* the receiver (which will be the return value).
197197
*/
198198
private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
199-
(private val bucketnumber: Int)
199+
(private val bucketnumber: Int)
200200
extends Combiner[Elem, To] {
201201
//self: EnvironmentPassingCombiner[Elem, To] =>
202202
protected var buckets: Array[UnrolledBuffer[Buck]] @uncheckedVariance = new Array[UnrolledBuffer[Buck]](bucketnumber)
203203
protected var sz: Int = 0
204-
204+
205205
def size = sz
206-
206+
207207
def clear() = {
208208
buckets = new Array[UnrolledBuffer[Buck]](bucketnumber)
209209
sz = 0
210210
}
211-
211+
212212
def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
213+
213214
def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
214-
215+
215216
def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = {
216217
if (this eq other) this
217218
else other match {

0 commit comments

Comments
 (0)