Skip to content
This repository was archived by the owner on Jul 30, 2024. It is now read-only.

[stream] Fix stream.pipeline returning wrong type #266

Merged
merged 2 commits into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.scalajs.nodejs.stream

import io.scalajs.nodejs.fs
import io.scalajs.nodejs.fs.Fs
import io.scalajs.nodejs.zlib
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funspec.AnyFunSpec

import scala.scalajs.js

class StreamTest extends AnyFunSpec with BeforeAndAfterEach {
override def afterEach(): Unit = {
Seq(
"package.json.gz"
).foreach { d =>
if (Fs.existsSync(d)) Fs.unlinkSync(d)
}
}

describe("Stream") {
it("pipeline should return the stream which have same type of destination") {
val result = Stream.pipeline(
fs.Fs.createReadStream("package.json"),
zlib.Zlib.createGzip(),
fs.Fs.createWriteStream("package.json.gz"),
err => {
assert(err === js.undefined)
}
)
assert(result.isInstanceOf[Writable])
}
}

describe("StreamModuleExtensions") {
it("pipelineFromSeq should return the stream which have same type of destination") {
val result = Stream.pipelineFromSeq(
fs.Fs.createReadStream("package.json"),
Seq(zlib.Zlib.createGzip()),
fs.Fs.createWriteStream("package.json.gz"),
err => {
assert(err === js.undefined)
}
)
assert(result.isInstanceOf[Writable])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,37 @@ import scala.scalajs.js
import scala.scalajs.js.annotation.{JSImport, JSName}
import scala.scalajs.js.typedarray.Uint8Array

/**
* Marker trait as parent of both Readable and Writable.
*/
@js.native
trait Stream extends js.Object

@js.native
@JSImport("stream", JSImport.Namespace)
object Stream extends js.Object {
def finished(stream: Stream, options: FinishedOptions, callback: ErrorCallback): Wait = js.native
def finished(stream: Stream, callback: ErrorCallback): Wait = js.native

def pipeline(a: Stream, b: Stream, callback: ErrorCallback): Wait = js.native
def pipeline(a: Stream, b: Stream, c: Stream, callback: ErrorCallback): Wait = js.native
def pipeline(a: Stream, b: Stream, c: Stream, d: Stream, callback: ErrorCallback): Wait = js.native
def pipeline(a: Stream, b: Stream, c: Stream, d: Stream, e: Stream, callback: ErrorCallback): Wait = js.native
def pipeline(a: Stream, b: Stream, c: Stream, d: Stream, e: Stream, f: Stream, callback: ErrorCallback): Wait =
def pipeline[D <: Stream](source: Stream, destination: D, callback: ErrorCallback): D = js.native
def pipeline[D <: Stream](source: Stream, a: Stream, destination: D, callback: ErrorCallback): D = js.native
def pipeline[D <: Stream](source: Stream, a: Stream, b: Stream, destination: D, callback: ErrorCallback): D =
js.native
def pipeline[D <: Stream](source: Stream,
a: Stream,
b: Stream,
c: Stream,
destination: D,
callback: ErrorCallback
): D = js.native
def pipeline[D <: Stream](source: Stream,
a: Stream,
b: Stream,
c: Stream,
d: Stream,
destination: D,
callback: ErrorCallback
): D = js.native
}

/**
Expand Down Expand Up @@ -98,7 +117,7 @@ class PassThrough() extends Transform
* @see https://nodejs.org/api/stream.html#stream_readable_streams
*/
@js.native
sealed trait IReadable extends LegacyStream {
sealed trait IReadable extends Stream with LegacyStream {
def destroyed: Boolean = js.native

/**
Expand Down Expand Up @@ -271,7 +290,7 @@ trait ReadableState extends js.Object {
* The Writable stream interface is an abstraction for a destination that you are writing data to.
*/
@js.native
sealed trait IWritable extends LegacyStream {
sealed trait IWritable extends Stream with LegacyStream {
/////////////////////////////////////////////////////////////////////////////////
// Methods
/////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,27 @@ import scala.scalajs.js.typedarray.Uint8Array
import scala.scalajs.js.|

package object stream {
type Stream = IReadable | IWritable

type ErrorCallback = js.Function1[io.scalajs.nodejs.Error, Any]
type Wait = js.Function0[js.Promise[Unit]]

implicit final class StreamModuleExtensions(private val stream: Stream.type) extends AnyVal {
def pipelineFromSeq(streams: Seq[Stream], errorCallback: ErrorCallback): Wait = {
streams match {
case Seq(a, b) => stream.pipeline(a, b, errorCallback)
case Seq(a, b, c) => stream.pipeline(a, b, c, errorCallback)
case Seq(a, b, c, d) => stream.pipeline(a, b, c, d, errorCallback)
case Seq(a, b, c, d, e) => stream.pipeline(a, b, c, d, e, errorCallback)
case Seq(a, b, c, d, e, f) => stream.pipeline(a, b, c, d, e, f, errorCallback)
def pipelineFromSeq[D <: Stream](source: Stream,
transforms: Seq[Stream],
destination: D,
errorCallback: ErrorCallback
): D = {
transforms match {
case Seq(a) => stream.pipeline(source, a, destination, errorCallback)
case Seq(a, b) => stream.pipeline(source, a, b, destination, errorCallback)
case Seq(a, b, c) => stream.pipeline(source, a, b, c, destination, errorCallback)
case Seq(a, b, c, d) => stream.pipeline(source, a, b, c, d, destination, errorCallback)
case _ =>
import scala.scalajs.js.JSConverters._
val arguments: js.Array[js.Any] = streams.toJSArray.asInstanceOf[js.Array[js.Any]]
val arguments: js.Array[js.Any] = transforms.toJSArray.asInstanceOf[js.Array[js.Any]]
arguments.prepend(source)
arguments.push(destination)
arguments.push(errorCallback)
stream.asInstanceOf[js.Dynamic].finished.apply(null, arguments).asInstanceOf[Wait]
stream.asInstanceOf[js.Dynamic].pipeline.apply(null, arguments).asInstanceOf[D]
}
}
}
Expand Down