diff --git a/app/nodejs-v10/src/test/scala/io/scalajs/nodejs/stream/StreamTest.scala b/app/nodejs-v10/src/test/scala/io/scalajs/nodejs/stream/StreamTest.scala new file mode 100644 index 000000000..8ec7d867d --- /dev/null +++ b/app/nodejs-v10/src/test/scala/io/scalajs/nodejs/stream/StreamTest.scala @@ -0,0 +1,47 @@ +package io.scalajs.nodejs.stream + +import io.scalajs.nodejs.fs +import io.scalajs.nodejs.fs.Fs +import io.scalajs.nodejs.zlib +import org.scalatest.BeforeAndAfterEach +import org.scalatest.funspec.AnyFunSpec + +import scala.scalajs.js + +class StreamTest extends AnyFunSpec with BeforeAndAfterEach { + override def afterEach(): Unit = { + Seq( + "package.json.gz" + ).foreach { d => + if (Fs.existsSync(d)) Fs.unlinkSync(d) + } + } + + describe("Stream") { + it("pipeline should return the stream which have same type of destination") { + val result = Stream.pipeline( + fs.Fs.createReadStream("package.json"), + zlib.Zlib.createGzip(), + fs.Fs.createWriteStream("package.json.gz"), + err => { + assert(err === js.undefined) + } + ) + assert(result.isInstanceOf[Writable]) + } + } + + describe("StreamModuleExtensions") { + it("pipelineFromSeq should return the stream which have same type of destination") { + val result = Stream.pipelineFromSeq( + fs.Fs.createReadStream("package.json"), + Seq(zlib.Zlib.createGzip()), + fs.Fs.createWriteStream("package.json.gz"), + err => { + assert(err === js.undefined) + } + ) + assert(result.isInstanceOf[Writable]) + } + } +} diff --git a/app/nodejs-v14/src/main/scala/io/scalajs/nodejs/stream/Stream.scala b/app/nodejs-v14/src/main/scala/io/scalajs/nodejs/stream/Stream.scala index 671eb1782..e1f108720 100644 --- a/app/nodejs-v14/src/main/scala/io/scalajs/nodejs/stream/Stream.scala +++ b/app/nodejs-v14/src/main/scala/io/scalajs/nodejs/stream/Stream.scala @@ -11,18 +11,37 @@ import scala.scalajs.js import scala.scalajs.js.annotation.{JSImport, JSName} import scala.scalajs.js.typedarray.Uint8Array +/** + * Marker trait as parent of both Readable and Writable. + */ +@js.native +trait Stream extends js.Object + @js.native @JSImport("stream", JSImport.Namespace) object Stream extends js.Object { def finished(stream: Stream, options: FinishedOptions, callback: ErrorCallback): Wait = js.native def finished(stream: Stream, callback: ErrorCallback): Wait = js.native - def pipeline(a: Stream, b: Stream, callback: ErrorCallback): Wait = js.native - def pipeline(a: Stream, b: Stream, c: Stream, callback: ErrorCallback): Wait = js.native - def pipeline(a: Stream, b: Stream, c: Stream, d: Stream, callback: ErrorCallback): Wait = js.native - def pipeline(a: Stream, b: Stream, c: Stream, d: Stream, e: Stream, callback: ErrorCallback): Wait = js.native - def pipeline(a: Stream, b: Stream, c: Stream, d: Stream, e: Stream, f: Stream, callback: ErrorCallback): Wait = + def pipeline[D <: Stream](source: Stream, destination: D, callback: ErrorCallback): D = js.native + def pipeline[D <: Stream](source: Stream, a: Stream, destination: D, callback: ErrorCallback): D = js.native + def pipeline[D <: Stream](source: Stream, a: Stream, b: Stream, destination: D, callback: ErrorCallback): D = js.native + def pipeline[D <: Stream](source: Stream, + a: Stream, + b: Stream, + c: Stream, + destination: D, + callback: ErrorCallback + ): D = js.native + def pipeline[D <: Stream](source: Stream, + a: Stream, + b: Stream, + c: Stream, + d: Stream, + destination: D, + callback: ErrorCallback + ): D = js.native } /** @@ -98,7 +117,7 @@ class PassThrough() extends Transform * @see https://nodejs.org/api/stream.html#stream_readable_streams */ @js.native -sealed trait IReadable extends LegacyStream { +sealed trait IReadable extends Stream with LegacyStream { def destroyed: Boolean = js.native /** @@ -271,7 +290,7 @@ trait ReadableState extends js.Object { * The Writable stream interface is an abstraction for a destination that you are writing data to. */ @js.native -sealed trait IWritable extends LegacyStream { +sealed trait IWritable extends Stream with LegacyStream { ///////////////////////////////////////////////////////////////////////////////// // Methods ///////////////////////////////////////////////////////////////////////////////// diff --git a/app/nodejs-v14/src/main/scala/io/scalajs/nodejs/stream/package.scala b/app/nodejs-v14/src/main/scala/io/scalajs/nodejs/stream/package.scala index e63150c15..e086fd2dd 100644 --- a/app/nodejs-v14/src/main/scala/io/scalajs/nodejs/stream/package.scala +++ b/app/nodejs-v14/src/main/scala/io/scalajs/nodejs/stream/package.scala @@ -9,24 +9,27 @@ import scala.scalajs.js.typedarray.Uint8Array import scala.scalajs.js.| package object stream { - type Stream = IReadable | IWritable - type ErrorCallback = js.Function1[io.scalajs.nodejs.Error, Any] type Wait = js.Function0[js.Promise[Unit]] implicit final class StreamModuleExtensions(private val stream: Stream.type) extends AnyVal { - def pipelineFromSeq(streams: Seq[Stream], errorCallback: ErrorCallback): Wait = { - streams match { - case Seq(a, b) => stream.pipeline(a, b, errorCallback) - case Seq(a, b, c) => stream.pipeline(a, b, c, errorCallback) - case Seq(a, b, c, d) => stream.pipeline(a, b, c, d, errorCallback) - case Seq(a, b, c, d, e) => stream.pipeline(a, b, c, d, e, errorCallback) - case Seq(a, b, c, d, e, f) => stream.pipeline(a, b, c, d, e, f, errorCallback) + def pipelineFromSeq[D <: Stream](source: Stream, + transforms: Seq[Stream], + destination: D, + errorCallback: ErrorCallback + ): D = { + transforms match { + case Seq(a) => stream.pipeline(source, a, destination, errorCallback) + case Seq(a, b) => stream.pipeline(source, a, b, destination, errorCallback) + case Seq(a, b, c) => stream.pipeline(source, a, b, c, destination, errorCallback) + case Seq(a, b, c, d) => stream.pipeline(source, a, b, c, d, destination, errorCallback) case _ => import scala.scalajs.js.JSConverters._ - val arguments: js.Array[js.Any] = streams.toJSArray.asInstanceOf[js.Array[js.Any]] + val arguments: js.Array[js.Any] = transforms.toJSArray.asInstanceOf[js.Array[js.Any]] + arguments.prepend(source) + arguments.push(destination) arguments.push(errorCallback) - stream.asInstanceOf[js.Dynamic].finished.apply(null, arguments).asInstanceOf[Wait] + stream.asInstanceOf[js.Dynamic].pipeline.apply(null, arguments).asInstanceOf[D] } } }