Skip to content

Repeated use of ForkJoinTaskSupport on ParCollection causes OutOfMemoryException #11097

@ktegan

Description

@ktegan

On my system if you repeatedly create collections and set the collection's tasksupport to a new ForkJoinTaskSupport you will fairly quickly (takes less than 1200 iterations on my machine) get a "java.lang.OutOfMemoryError: unable to create new native thread" Exception. This kind of error could be catastrophic for long running programs that don't realize that creating collections and setting a custom tasksupport will eventually crash their program.

I'm using Scala 2.12.5 and Java OpenJDK 1.8.0_161-b14 on a Linux 3.10.0 x86_64 kernel.

Any ideas of what's going on? It's possible this is related to issue scala/scala-parallel-collections#283.

Small test program with rudimentary command line parameter options:

import java.util.concurrent.ForkJoinPool
import scala.collection.parallel.ForkJoinTaskSupport

object ManyParCollections {
  val maxParCollections = 20000
  val parCollectionSize = 10
  val numThreads = 3

  def main(args:Array[String]):Unit = {
    val noJoinPool = ((args.size >= 1) && args(0) == "noJoinPool")
    val onlyJoinPool = ((args.size >= 1) && args(0) == "onlyJoinPool")
    val onlyTaskSupport = ((args.size >= 1) && args(0) == "onlyTaskSupport")
    (0 until maxParCollections).foreach { testIdx => 
      if (testIdx % 100 == 0) {
        System.err.println(s"iteration $testIdx of creating a parallel collection (noJoinPool $noJoinPool, onlyJoinPool $onlyJoinPool, onlyTaskSupport $onlyTaskSupport)")
      }
    
      val parCollection = (0 until parCollectionSize).toVector.par
      if (noJoinPool) {
        // do nothing, keep original parallel collection
      } else if (onlyJoinPool) {
        // allocate ForkJoinPool
        val joinPool = new ForkJoinPool(numThreads)
      } else if (onlyTaskSupport) {
        // allocate ForkJoinTaskSupport and ForkJoinPool
        new ForkJoinTaskSupport(new ForkJoinPool(numThreads))
      } else {
        // set parallel collection to use ForkJoinTaskSupport
        parCollection.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(numThreads))
      }
      val threadCountLogList = parCollection.map { idx =>
        idx * 3
      }
    }
    System.err.println("all done!")
  }
}

If we create a ForkJoinTaskSupport and assign it to the collection's tasksupport (no argument) then there is an error:

> scalac  ManyParCollections.scala ; scala ManyParCollections 
iteration 0 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 100 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 200 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 300 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 400 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 500 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 600 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 700 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 800 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 900 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 1000 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
iteration 1100 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport false)
java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
        at java.lang.Thread.start(Thread.java:717)
        at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
        at java.util.concurrent.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1517)
        at java.util.concurrent.ForkJoinPool.signalWork(ForkJoinPool.java:1634)
        at java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:2367)
        at java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:2419)
        at java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2630)
        at scala.collection.parallel.ForkJoinTasks.executeAndWaitResult(Tasks.scala:417)
        at scala.collection.parallel.ForkJoinTasks.executeAndWaitResult$(Tasks.scala:412)
        at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
        at scala.collection.parallel.ParIterableLike.map(ParIterableLike.scala:497)
        at scala.collection.parallel.ParIterableLike.map$(ParIterableLike.scala:496)
        at scala.collection.parallel.immutable.ParVector.map(ParVector.scala:38)
        at ManyParCollections$.$anonfun$main$1(ManyParCollections.scala:32)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:156)
        at ManyParCollections$.main(ManyParCollections.scala:14)
        at ManyParCollections.main(ManyParCollections.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at scala.reflect.internal.util.ScalaClassLoader.$anonfun$run$2(ScalaClassLoader.scala:99)
        at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:34)
        at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:30)
        at scala.reflect.internal.util.ScalaClassLoader$URLClassLoader.asContext(ScalaClassLoader.scala:125)
        at scala.reflect.internal.util.ScalaClassLoader.run(ScalaClassLoader.scala:99)
        at scala.reflect.internal.util.ScalaClassLoader.run$(ScalaClassLoader.scala:91)
        at scala.reflect.internal.util.ScalaClassLoader$URLClassLoader.run(ScalaClassLoader.scala:125)
        at scala.tools.nsc.CommonRunner.run(ObjectRunner.scala:22)
        at scala.tools.nsc.CommonRunner.run$(ObjectRunner.scala:21)
        at scala.tools.nsc.ObjectRunner$.run(ObjectRunner.scala:39)
        at scala.tools.nsc.CommonRunner.runAndCatch(ObjectRunner.scala:29)
        at scala.tools.nsc.CommonRunner.runAndCatch$(ObjectRunner.scala:28)
        at scala.tools.nsc.ObjectRunner$.runAndCatch(ObjectRunner.scala:39)
        at scala.tools.nsc.MainGenericRunner.runTarget$1(MainGenericRunner.scala:66)
        at scala.tools.nsc.MainGenericRunner.run$1(MainGenericRunner.scala:85)
        at scala.tools.nsc.MainGenericRunner.process(MainGenericRunner.scala:96)
        at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:101)
        at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)

If we allocate both the ForkJoinTaskSupport and ForkJoinPool but do not assign it (command line argument "onlyTaskSupport") then there is no error:

> scalac  ManyParCollections.scala ; scala ManyParCollections onlyTaskSupport
iteration 0 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport true)
iteration 100 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport true)
iteration 200 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport true)
...
iteration 19900 of creating a parallel collection (noJoinPool false, onlyJoinPool false, onlyTaskSupport true)
all done!

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions