From 2fb9935e6c59ead89cdd425512ccf6e95872b3d8 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 12 Apr 2014 01:09:42 -0700 Subject: [PATCH 1/3] Added a FastByteArrayOutputStream that exposes the underlying array to avoid unnecessary mem copy. --- .../org/apache/spark/scheduler/Task.scala | 8 +- .../spark/serializer/JavaSerializer.scala | 2 +- .../apache/spark/serializer/Serializer.scala | 9 +- .../apache/spark/storage/BlockManager.scala | 8 +- .../util/{ => io}/ByteBufferInputStream.scala | 3 +- .../util/io/FastByteArrayOutputStream.scala | 104 ++++++++++++++++++ .../spark/storage/BlockManagerSuite.scala | 3 +- .../io/FastByteArrayOutputStreamSuite.scala | 94 ++++++++++++++++ .../spark/streaming/util/RawTextSender.scala | 10 +- 9 files changed, 223 insertions(+), 18 deletions(-) rename core/src/main/scala/org/apache/spark/util/{ => io}/ByteBufferInputStream.scala (96%) create mode 100644 core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala create mode 100644 core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index a8bcb7dfe2f3..6f76110afb43 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.io.{DataInputStream, DataOutputStream} import java.nio.ByteBuffer import scala.collection.mutable.HashMap @@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap import org.apache.spark.TaskContext import org.apache.spark.executor.TaskMetrics import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.ByteBufferInputStream +import org.apache.spark.util.io.{ByteBufferInputStream, FastByteArrayOutputStream} /** * A unit of execution. We have two kinds of Task's in Spark: @@ -102,7 +102,7 @@ private[spark] object Task { serializer: SerializerInstance) : ByteBuffer = { - val out = new ByteArrayOutputStream(4096) + val out = new FastByteArrayOutputStream(4096) val dataOut = new DataOutputStream(out) // Write currentFiles @@ -123,7 +123,7 @@ private[spark] object Task { dataOut.flush() val taskBytes = serializer.serialize(task).array() out.write(taskBytes) - ByteBuffer.wrap(out.toByteArray) + ByteBuffer.wrap(out.array) } /** diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 5e5883554fcc..31073ddc13e8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.ByteBufferInputStream +import org.apache.spark.util.io.ByteBufferInputStream private[spark] class JavaSerializationStream(out: OutputStream, counterReset: Int) extends SerializationStream { diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index f2c8f9b6218d..49e835053e34 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -17,12 +17,13 @@ package org.apache.spark.serializer -import java.io.{ByteArrayOutputStream, EOFException, InputStream, OutputStream} +import java.io.{EOFException, InputStream, OutputStream} import java.nio.ByteBuffer import org.apache.spark.SparkEnv import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.{ByteBufferInputStream, NextIterator} +import org.apache.spark.util.NextIterator +import org.apache.spark.util.io.{ByteBufferInputStream, FastByteArrayOutputStream} /** * :: DeveloperApi :: @@ -71,9 +72,9 @@ trait SerializerInstance { def serializeMany[T](iterator: Iterator[T]): ByteBuffer = { // Default implementation uses serializeStream - val stream = new ByteArrayOutputStream() + val stream = new FastByteArrayOutputStream() serializeStream(stream).writeAll(iterator) - val buffer = ByteBuffer.wrap(stream.toByteArray) + val buffer = ByteBuffer.wrap(stream.array) buffer.flip() buffer } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f14017051fa0..d7393269c574 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{File, InputStream, OutputStream, BufferedOutputStream, ByteArrayOutputStream} +import java.io.{File, InputStream, OutputStream, BufferedOutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.mutable.{ArrayBuffer, HashMap} @@ -33,6 +33,8 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer import org.apache.spark.util._ +import org.apache.spark.util.io.{ByteBufferInputStream, FastByteArrayOutputStream} + private[spark] sealed trait Values @@ -1001,9 +1003,9 @@ private[spark] class BlockManager( blockId: BlockId, values: Iterator[Any], serializer: Serializer = defaultSerializer): ByteBuffer = { - val byteStream = new ByteArrayOutputStream(4096) + val byteStream = new FastByteArrayOutputStream(4096) dataSerializeStream(blockId, byteStream, values, serializer) - ByteBuffer.wrap(byteStream.toByteArray) + ByteBuffer.wrap(byteStream.array) } /** diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/org/apache/spark/util/io/ByteBufferInputStream.scala similarity index 96% rename from core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala rename to core/src/main/scala/org/apache/spark/util/io/ByteBufferInputStream.scala index 54de4d4ee8ca..da45cfe2ce65 100644 --- a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ByteBufferInputStream.scala @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.util.io import java.io.InputStream import java.nio.ByteBuffer +// TODO(rxin): This file should not depend on BlockManager. import org.apache.spark.storage.BlockManager /** diff --git a/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala b/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala new file mode 100644 index 000000000000..8c300c763e72 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.io + +import java.io.OutputStream + +/** + * A simple, fast byte-array output stream that exposes the backing array, + * inspired by fastutil's FastByteArrayOutputStream. + * + * [[java.io.ByteArrayOutputStream]] is nice, but to get its content you + * must generate each time a new object. This doesn't happen here. + * + * This class will automatically enlarge the backing array, doubling its + * size whenever new space is needed. + */ +private[spark] class FastByteArrayOutputStream(initialCapacity: Int = 16) extends OutputStream { + + private[this] var _array = new Array[Byte](initialCapacity) + + /** The current writing position. */ + private[this] var _position: Int = 0 + + /** The array backing the output stream. */ + def array: Array[Byte] = _array + + /** The number of valid bytes in array. */ + def length: Int = _position + + override def write(b: Int): Unit = { + if (_position >= _array.length ) { + _array = FastByteArrayOutputStream.growArray(_array, _position + 1, _position) + } + _array(_position) = b.toByte + _position += 1 + } + + override def write(b: Array[Byte], off: Int, len: Int) { + if (off < 0) { + throw new ArrayIndexOutOfBoundsException(s"Offset ($off) is negative" ) + } + if (len < 0) { + throw new IllegalArgumentException(s"Length ($len) is negative" ) + } + if (off + len > b.length) { + throw new ArrayIndexOutOfBoundsException( + s"Last index (${off+len}) is greater than array length (${b.length})") + } + if ( _position + len > _array.length ) { + _array = FastByteArrayOutputStream.growArray(_array, _position + len, _position) + } + System.arraycopy(b, off, _array, _position, len) + _position += len + } + + /** Ensures that the length of the backing array is equal to [[length]]. */ + def trim(): this.type = { + if (_position < _array.length) { + val newArr = new Array[Byte](_position) + System.arraycopy(_array, 0, newArr, 0, _position) + _array = newArr + } + this + } +} + +private object FastByteArrayOutputStream { + /** + * Grows the given array to the maximum between the given length and the current length + * multiplied by two, provided that the given length is larger than the current length, + * preserving just a part of the array. + * + * @param arr input array + * @param len the new minimum length for this array + * @param preserve the number of elements of the array that must be preserved + * in case a new allocation is necessary + */ + private def growArray(arr: Array[Byte], len: Int, preserve: Int): Array[Byte] = { + if (len > arr.length) { + val maxArraySize = Integer.MAX_VALUE - 8 + val newLen = math.min( math.max(2L * arr.length, len), maxArraySize).toInt + val newArr = new Array[Byte](newLen) + System.arraycopy(arr, 0, newArr, 0, preserve) + newArr + } else { + arr + } + } +} \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e10ec7d2624a..7dd931857aa7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -31,7 +31,8 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} -import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} +import org.apache.spark.util.{AkkaUtils, SizeEstimator, Utils} +import org.apache.spark.util.io.ByteBufferInputStream class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { private val conf = new SparkConf(false) diff --git a/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala new file mode 100644 index 000000000000..3c9e1e7347dd --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.io + +import org.scalatest.FunSuite + + +class FastByteArrayOutputStreamSuite extends FunSuite { + + test("write single byte") { + val out = new FastByteArrayOutputStream(initialCapacity = 4) + out.write(0) + out.write(1) + assert(out.array(0) === 0) + assert(out.array(1) === 1) + assert(out.length === 2) + + out.write(2) + out.write(3) + assert(out.array(2) === 2) + assert(out.array(3) === 3) + assert(out.length === 4) + + out.write(4) + assert(out.array(4) === 4) + assert(out.length === 5) + + for (i <- 5 to 100) { + out.write(i) + } + + for (i <- 5 to 100) { + assert(out.array(i) === i) + } + } + + test("write multiple bytes") { + val out = new FastByteArrayOutputStream(initialCapacity = 4) + out.write(Array[Byte](0.toByte, 1.toByte)) + assert(out.length === 2) + assert(out.array(0) === 0) + assert(out.array(1) === 1) + + out.write(Array[Byte](2.toByte, 3.toByte, 4.toByte)) + assert(out.length === 5) + assert(out.array(2) === 2) + assert(out.array(3) === 3) + assert(out.array(4) === 4) + + // Write more than double the size of the current array + out.write((1 to 100).map(_.toByte).toArray) + assert(out.length === 105) + assert(out.array(104) === 100) + } + + test("test large writes") { + val out = new FastByteArrayOutputStream(initialCapacity = 4096) + out.write(Array.tabulate[Byte](4096 * 1000)(_.toByte)) + assert(out.length === 4096 * 1000) + assert(out.array(0) === 0) + assert(out.array(4096 * 1000 - 1) === (4096 * 1000 - 1).toByte) + + out.write(Array.tabulate[Byte](4096 * 1000)(_.toByte)) + assert(out.length === 2 * 4096 * 1000) + assert(out.array(0) === 0) + assert(out.array(4096 * 1000) === 0) + assert(out.array(2 * 4096 * 1000 - 1) === (4096 * 1000 - 1).toByte) + } + + test("trim") { + val out = new FastByteArrayOutputStream(initialCapacity = 4096) + out.write(1) + assert(out.trim().array.length === 1) + + val out1 = new FastByteArrayOutputStream(initialCapacity = 1) + out1.write(1) + assert(out1.trim().array.length === 1) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index a7850812bd61..6c982ee76893 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.util -import java.io.{ByteArrayOutputStream, IOException} +import java.io.IOException import java.net.ServerSocket import java.nio.ByteBuffer @@ -26,6 +26,8 @@ import scala.io.Source import org.apache.spark.{SparkConf, Logging} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.IntParam +import org.apache.spark.util.io.FastByteArrayOutputStream + /** * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a @@ -43,15 +45,15 @@ object RawTextSender extends Logging { // Repeat the input data multiple times to fill in a buffer val lines = Source.fromFile(file).getLines().toArray - val bufferStream = new ByteArrayOutputStream(blockSize + 1000) + val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) val ser = new KryoSerializer(new SparkConf()).newInstance() val serStream = ser.serializeStream(bufferStream) var i = 0 - while (bufferStream.size < blockSize) { + while (bufferStream.length < blockSize) { serStream.writeObject(lines(i)) i = (i + 1) % lines.length } - val array = bufferStream.toByteArray + val array = bufferStream.trim().array val countBuf = ByteBuffer.wrap(new Array[Byte](4)) countBuf.putInt(array.length) From ee40008e7c3aac601579b226ef1138cd0b797f21 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 12 Apr 2014 17:09:59 -0700 Subject: [PATCH 2/3] Added a toByteBuffer method and toArray method to avoid the extra mem copy in trim(). --- .../org/apache/spark/scheduler/Task.scala | 2 +- .../apache/spark/serializer/Serializer.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 2 +- .../util/io/FastByteArrayOutputStream.scala | 19 ++++++--- .../io/FastByteArrayOutputStreamSuite.scala | 42 ++++++++++--------- .../spark/streaming/util/RawTextSender.scala | 6 +-- 6 files changed, 43 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 6f76110afb43..f2a4422f05a6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -123,7 +123,7 @@ private[spark] object Task { dataOut.flush() val taskBytes = serializer.serialize(task).array() out.write(taskBytes) - ByteBuffer.wrap(out.array) + out.toByteBuffer } /** diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 49e835053e34..2fb2b191e3ac 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -74,7 +74,7 @@ trait SerializerInstance { // Default implementation uses serializeStream val stream = new FastByteArrayOutputStream() serializeStream(stream).writeAll(iterator) - val buffer = ByteBuffer.wrap(stream.array) + val buffer = stream.toByteBuffer buffer.flip() buffer } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d7393269c574..ba4c9e6be94a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1005,7 +1005,7 @@ private[spark] class BlockManager( serializer: Serializer = defaultSerializer): ByteBuffer = { val byteStream = new FastByteArrayOutputStream(4096) dataSerializeStream(blockId, byteStream, values, serializer) - ByteBuffer.wrap(byteStream.array) + byteStream.toByteBuffer } /** diff --git a/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala b/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala index 8c300c763e72..6863f345bbe3 100644 --- a/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.io import java.io.OutputStream +import java.nio.ByteBuffer /** * A simple, fast byte-array output stream that exposes the backing array, @@ -36,9 +37,6 @@ private[spark] class FastByteArrayOutputStream(initialCapacity: Int = 16) extend /** The current writing position. */ private[this] var _position: Int = 0 - /** The array backing the output stream. */ - def array: Array[Byte] = _array - /** The number of valid bytes in array. */ def length: Int = _position @@ -59,7 +57,7 @@ private[spark] class FastByteArrayOutputStream(initialCapacity: Int = 16) extend } if (off + len > b.length) { throw new ArrayIndexOutOfBoundsException( - s"Last index (${off+len}) is greater than array length (${b.length})") + s"Last index (${off + len}) is greater than array length (${b.length})") } if ( _position + len > _array.length ) { _array = FastByteArrayOutputStream.growArray(_array, _position + len, _position) @@ -68,6 +66,17 @@ private[spark] class FastByteArrayOutputStream(initialCapacity: Int = 16) extend _position += len } + /** Return a ByteBuffer wrapping around the filled content of the underlying array. */ + def toByteBuffer: ByteBuffer = { + ByteBuffer.wrap(_array, 0, _position) + } + + /** + * Return a tuple, where the first element is the underlying array, and the second element + * is the length of the filled content. + */ + def toArray: (Array[Byte], Int) = (_array, _position) + /** Ensures that the length of the backing array is equal to [[length]]. */ def trim(): this.type = { if (_position < _array.length) { @@ -101,4 +110,4 @@ private object FastByteArrayOutputStream { arr } } -} \ No newline at end of file +} diff --git a/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala index 3c9e1e7347dd..5ffd81c0d9fb 100644 --- a/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala @@ -26,18 +26,20 @@ class FastByteArrayOutputStreamSuite extends FunSuite { val out = new FastByteArrayOutputStream(initialCapacity = 4) out.write(0) out.write(1) - assert(out.array(0) === 0) - assert(out.array(1) === 1) + assert(out.toArray._1(0) === 0) + assert(out.toArray._1(1) === 1) + assert(out.toArray._2 === 2) assert(out.length === 2) out.write(2) out.write(3) - assert(out.array(2) === 2) - assert(out.array(3) === 3) + assert(out.toArray._1(2) === 2) + assert(out.toArray._1(3) === 3) assert(out.length === 4) out.write(4) - assert(out.array(4) === 4) + assert(out.toArray._1(4) === 4) + assert(out.toArray._2 === 5) assert(out.length === 5) for (i <- 5 to 100) { @@ -45,7 +47,7 @@ class FastByteArrayOutputStreamSuite extends FunSuite { } for (i <- 5 to 100) { - assert(out.array(i) === i) + assert(out.toArray._1(i) === i) } } @@ -53,42 +55,44 @@ class FastByteArrayOutputStreamSuite extends FunSuite { val out = new FastByteArrayOutputStream(initialCapacity = 4) out.write(Array[Byte](0.toByte, 1.toByte)) assert(out.length === 2) - assert(out.array(0) === 0) - assert(out.array(1) === 1) + assert(out.toArray._1(0) === 0) + assert(out.toArray._1(1) === 1) out.write(Array[Byte](2.toByte, 3.toByte, 4.toByte)) assert(out.length === 5) - assert(out.array(2) === 2) - assert(out.array(3) === 3) - assert(out.array(4) === 4) + assert(out.toArray._1(2) === 2) + assert(out.toArray._1(3) === 3) + assert(out.toArray._1(4) === 4) // Write more than double the size of the current array out.write((1 to 100).map(_.toByte).toArray) assert(out.length === 105) - assert(out.array(104) === 100) + assert(out.toArray._1(104) === 100) } test("test large writes") { val out = new FastByteArrayOutputStream(initialCapacity = 4096) out.write(Array.tabulate[Byte](4096 * 1000)(_.toByte)) assert(out.length === 4096 * 1000) - assert(out.array(0) === 0) - assert(out.array(4096 * 1000 - 1) === (4096 * 1000 - 1).toByte) + assert(out.toArray._1(0) === 0) + assert(out.toArray._1(4096 * 1000 - 1) === (4096 * 1000 - 1).toByte) + assert(out.toArray._2 === 4096 * 1000) out.write(Array.tabulate[Byte](4096 * 1000)(_.toByte)) assert(out.length === 2 * 4096 * 1000) - assert(out.array(0) === 0) - assert(out.array(4096 * 1000) === 0) - assert(out.array(2 * 4096 * 1000 - 1) === (4096 * 1000 - 1).toByte) + assert(out.toArray._1(0) === 0) + assert(out.toArray._1(4096 * 1000) === 0) + assert(out.toArray._1(2 * 4096 * 1000 - 1) === (4096 * 1000 - 1).toByte) + assert(out.toArray._2 === 2 * 4096 * 1000) } test("trim") { val out = new FastByteArrayOutputStream(initialCapacity = 4096) out.write(1) - assert(out.trim().array.length === 1) + assert(out.trim().toArray._2 === 1) val out1 = new FastByteArrayOutputStream(initialCapacity = 1) out1.write(1) - assert(out1.trim().array.length === 1) + assert(out1.trim().toArray._2 === 1) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index 6c982ee76893..a7d0f1dbf914 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -53,10 +53,10 @@ object RawTextSender extends Logging { serStream.writeObject(lines(i)) i = (i + 1) % lines.length } - val array = bufferStream.trim().array + val (array, len) = bufferStream.toArray val countBuf = ByteBuffer.wrap(new Array[Byte](4)) - countBuf.putInt(array.length) + countBuf.putInt(len) countBuf.flip() val serverSocket = new ServerSocket(port) @@ -69,7 +69,7 @@ object RawTextSender extends Logging { try { while (true) { out.write(countBuf.array) - out.write(array) + out.write(array, 0, len) // array's offset is 0, as returned by FastByteArrayOutputStream } } catch { case e: IOException => From 1746223744a9eb7d9808d749d0891aa3e37ead59 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 13 Apr 2014 14:32:49 -0700 Subject: [PATCH 3/3] Removed trim in FastByteArrayOutputStream. --- .../spark/util/io/FastByteArrayOutputStream.scala | 10 ---------- .../spark/util/io/FastByteArrayOutputStreamSuite.scala | 10 ---------- 2 files changed, 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala b/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala index 6863f345bbe3..9c4ab274e041 100644 --- a/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala @@ -76,16 +76,6 @@ private[spark] class FastByteArrayOutputStream(initialCapacity: Int = 16) extend * is the length of the filled content. */ def toArray: (Array[Byte], Int) = (_array, _position) - - /** Ensures that the length of the backing array is equal to [[length]]. */ - def trim(): this.type = { - if (_position < _array.length) { - val newArr = new Array[Byte](_position) - System.arraycopy(_array, 0, newArr, 0, _position) - _array = newArr - } - this - } } private object FastByteArrayOutputStream { diff --git a/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala index 5ffd81c0d9fb..d143164d2725 100644 --- a/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/io/FastByteArrayOutputStreamSuite.scala @@ -85,14 +85,4 @@ class FastByteArrayOutputStreamSuite extends FunSuite { assert(out.toArray._1(2 * 4096 * 1000 - 1) === (4096 * 1000 - 1).toByte) assert(out.toArray._2 === 2 * 4096 * 1000) } - - test("trim") { - val out = new FastByteArrayOutputStream(initialCapacity = 4096) - out.write(1) - assert(out.trim().toArray._2 === 1) - - val out1 = new FastByteArrayOutputStream(initialCapacity = 1) - out1.write(1) - assert(out1.trim().toArray._2 === 1) - } }