diff --git a/.gitignore b/.gitignore index b2347058..70c8a5a2 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ test-results test-tmp *.class gradle.properties +*.orig diff --git a/.java-version b/.java-version new file mode 100644 index 00000000..ec635144 --- /dev/null +++ b/.java-version @@ -0,0 +1 @@ +9 diff --git a/build.gradle b/build.gradle index 8d3ba4e1..7f4708a1 100644 --- a/build.gradle +++ b/build.gradle @@ -46,7 +46,11 @@ subprojects { } } - if (name in ["reactive-streams", "reactive-streams-tck", "reactive-streams-examples", "reactive-streams-flow-bridge"]) { + if (name in ["reactive-streams", + "reactive-streams-tck", + "reactive-streams-tck-flow", + "reactive-streams-examples", + "reactive-streams-flow-bridge"]) { apply plugin: "maven" apply plugin: "signing" diff --git a/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java b/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java index bb8b320f..e45fd548 100644 --- a/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java +++ b/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java @@ -29,17 +29,17 @@ private ReactiveStreamsFlowBridge() { * @return the equivalent Reactive Streams Publisher */ @SuppressWarnings("unchecked") - public static org.reactivestreams.Publisher toReactiveStreams( + public static org.reactivestreams.Publisher toPublisher( Flow.Publisher flowPublisher) { if (flowPublisher == null) { throw new NullPointerException("flowPublisher"); } - if (flowPublisher instanceof org.reactivestreams.Publisher) { - return (org.reactivestreams.Publisher)flowPublisher; - } if (flowPublisher instanceof FlowPublisherFromReactive) { return (org.reactivestreams.Publisher)(((FlowPublisherFromReactive)flowPublisher).reactiveStreams); } + if (flowPublisher instanceof org.reactivestreams.Publisher) { + return (org.reactivestreams.Publisher)flowPublisher; + } return new ReactivePublisherFromFlow(flowPublisher); } @@ -50,21 +50,21 @@ public static org.reactivestreams.Publisher toReactiveStreams( * @return the equivalent Flow Publisher */ @SuppressWarnings("unchecked") - public static Flow.Publisher toFlow( + public static Flow.Publisher toFlowPublisher( org.reactivestreams.Publisher reactiveStreamsPublisher ) { if (reactiveStreamsPublisher == null) { throw new NullPointerException("reactiveStreamsPublisher"); } - if (reactiveStreamsPublisher instanceof Flow.Publisher) { - return (Flow.Publisher)reactiveStreamsPublisher; - } if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) { return (Flow.Publisher)(((ReactivePublisherFromFlow)reactiveStreamsPublisher).flow); } + if (reactiveStreamsPublisher instanceof Flow.Publisher) { + return (Flow.Publisher)reactiveStreamsPublisher; + } return new FlowPublisherFromReactive(reactiveStreamsPublisher); } - + /** * Converts a Flow Processor into a Reactive Streams Processor. * @param the input value type @@ -73,18 +73,18 @@ public static Flow.Publisher toFlow( * @return the equivalent Reactive Streams Processor */ @SuppressWarnings("unchecked") - public static org.reactivestreams.Processor toReactiveStreams( + public static org.reactivestreams.Processor toProcessor( Flow.Processor flowProcessor ) { if (flowProcessor == null) { throw new NullPointerException("flowProcessor"); } - if (flowProcessor instanceof org.reactivestreams.Processor) { - return (org.reactivestreams.Processor)flowProcessor; - } if (flowProcessor instanceof FlowToReactiveProcessor) { return (org.reactivestreams.Processor)(((FlowToReactiveProcessor)flowProcessor).reactiveStreams); } + if (flowProcessor instanceof org.reactivestreams.Processor) { + return (org.reactivestreams.Processor)flowProcessor; + } return new ReactiveToFlowProcessor(flowProcessor); } @@ -96,18 +96,18 @@ public static org.reactivestreams.Processor toReactiveStreams( * @return the equivalent Flow Processor */ @SuppressWarnings("unchecked") - public static Flow.Processor toFlow( + public static Flow.Processor toFlowProcessor( org.reactivestreams.Processor reactiveStreamsProcessor ) { if (reactiveStreamsProcessor == null) { throw new NullPointerException("reactiveStreamsProcessor"); } - if (reactiveStreamsProcessor instanceof Flow.Processor) { - return (Flow.Processor)reactiveStreamsProcessor; - } if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) { return (Flow.Processor)(((ReactiveToFlowProcessor)reactiveStreamsProcessor).flow); } + if (reactiveStreamsProcessor instanceof Flow.Processor) { + return (Flow.Processor)reactiveStreamsProcessor; + } return new FlowToReactiveProcessor(reactiveStreamsProcessor); } @@ -117,10 +117,17 @@ public static Flow.Processor toFlow( * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert * @return the equivalent Flow Subscriber */ + @SuppressWarnings("unchecked") public static Flow.Subscriber toFlowSubscriber(org.reactivestreams.Subscriber reactiveStreamsSubscriber) { if (reactiveStreamsSubscriber == null) { throw new NullPointerException("reactiveStreamsSubscriber"); } + if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) { + return (Flow.Subscriber)((ReactiveToFlowSubscriber)reactiveStreamsSubscriber).flow; + } + if (reactiveStreamsSubscriber instanceof Flow.Subscriber) { + return (Flow.Subscriber)reactiveStreamsSubscriber; + } return new FlowToReactiveSubscriber(reactiveStreamsSubscriber); } @@ -130,10 +137,17 @@ public static Flow.Subscriber toFlowSubscriber(org.reactivestreams.Subscr * @param flowSubscriber the Flow Subscriber instance to convert * @return the equivalent Reactive Streams Subscriber */ - public static org.reactivestreams.Subscriber toReactiveStreamsSubscriber(Flow.Subscriber flowSubscriber) { + @SuppressWarnings("unchecked") + public static org.reactivestreams.Subscriber toSubscriber(Flow.Subscriber flowSubscriber) { if (flowSubscriber == null) { throw new NullPointerException("flowSubscriber"); } + if (flowSubscriber instanceof FlowToReactiveSubscriber) { + return (org.reactivestreams.Subscriber)((FlowToReactiveSubscriber)flowSubscriber).reactiveStreams; + } + if (flowSubscriber instanceof org.reactivestreams.Subscriber) { + return (org.reactivestreams.Subscriber)flowSubscriber; + } return new ReactiveToFlowSubscriber(flowSubscriber); } @@ -141,8 +155,8 @@ public static org.reactivestreams.Subscriber toReactiveStreamsSubscriber( * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription. */ static final class FlowToReactiveSubscription implements Flow.Subscription { - private final org.reactivestreams.Subscription reactiveStreams; - + final org.reactivestreams.Subscription reactiveStreams; + public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) { this.reactiveStreams = reactive; } @@ -156,15 +170,15 @@ public void request(long n) { public void cancel() { reactiveStreams.cancel(); } - + } - + /** * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription. */ static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription { - private final Flow.Subscription flow; - + final Flow.Subscription flow; + public ReactiveToFlowSubscription(Flow.Subscription flow) { this.flow = flow; } @@ -178,25 +192,25 @@ public void request(long n) { public void cancel() { flow.cancel(); } - - + + } - + /** * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. * @param the element type */ - static final class FlowToReactiveSubscriber + static final class FlowToReactiveSubscriber implements Flow.Subscriber { - private final org.reactivestreams.Subscriber reactiveStreams; - + final org.reactivestreams.Subscriber reactiveStreams; + public FlowToReactiveSubscriber(org.reactivestreams.Subscriber reactive) { this.reactiveStreams = reactive; } @Override public void onSubscribe(Flow.Subscription subscription) { - reactiveStreams.onSubscribe(new ReactiveToFlowSubscription(subscription)); + reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); } @Override @@ -213,24 +227,24 @@ public void onError(Throwable throwable) { public void onComplete() { reactiveStreams.onComplete(); } - + } /** * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. * @param the element type */ - static final class ReactiveToFlowSubscriber + static final class ReactiveToFlowSubscriber implements org.reactivestreams.Subscriber { - private final Flow.Subscriber flow; - + final Flow.Subscriber flow; + public ReactiveToFlowSubscriber(Flow.Subscriber flow) { this.flow = flow; } @Override public void onSubscribe(org.reactivestreams.Subscription subscription) { - flow.onSubscribe(new FlowToReactiveSubscription(subscription)); + flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); } @Override @@ -247,9 +261,9 @@ public void onError(Throwable throwable) { public void onComplete() { flow.onComplete(); } - + } - + /** * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it. * @param the input type @@ -258,14 +272,14 @@ public void onComplete() { static final class ReactiveToFlowProcessor implements org.reactivestreams.Processor { final Flow.Processor flow; - + public ReactiveToFlowProcessor(Flow.Processor flow) { this.flow = flow; } @Override - public void onSubscribe(org.reactivestreams.Subscription s) { - flow.onSubscribe(new FlowToReactiveSubscription(s)); + public void onSubscribe(org.reactivestreams.Subscription subscription) { + flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); } @Override @@ -285,14 +299,10 @@ public void onComplete() { @Override public void subscribe(org.reactivestreams.Subscriber s) { - if (s == null) { - flow.subscribe(null); - return; - } - flow.subscribe(new FlowToReactiveSubscriber(s)); + flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber(s)); } } - + /** * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it. * @param the input type @@ -301,14 +311,14 @@ public void subscribe(org.reactivestreams.Subscriber s) { static final class FlowToReactiveProcessor implements Flow.Processor { final org.reactivestreams.Processor reactiveStreams; - + public FlowToReactiveProcessor(org.reactivestreams.Processor reactive) { this.reactiveStreams = reactive; } @Override - public void onSubscribe(Flow.Subscription s) { - reactiveStreams.onSubscribe(new ReactiveToFlowSubscription(s)); + public void onSubscribe(Flow.Subscription subscription) { + reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); } @Override @@ -328,11 +338,7 @@ public void onComplete() { @Override public void subscribe(Flow.Subscriber s) { - if (s == null) { - reactiveStreams.subscribe(null); - return; - } - reactiveStreams.subscribe(new ReactiveToFlowSubscriber(s)); + reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber(s)); } } @@ -350,11 +356,7 @@ public ReactivePublisherFromFlow(Flow.Publisher flowPublisher) { @Override public void subscribe(org.reactivestreams.Subscriber reactive) { - if (reactive == null) { - flow.subscribe(null); - return; - } - flow.subscribe(new FlowToReactiveSubscriber(reactive)); + flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber(reactive)); } } @@ -372,12 +374,8 @@ public FlowPublisherFromReactive(org.reactivestreams.Publisher reac @Override public void subscribe(Flow.Subscriber flow) { - if (flow == null) { - reactiveStreams.subscribe(null); - return; - } - reactiveStreams.subscribe(new ReactiveToFlowSubscriber(flow)); + reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber(flow)); } } -} \ No newline at end of file +} diff --git a/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java b/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java index 03573a0a..1e24172f 100644 --- a/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java +++ b/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java @@ -31,7 +31,7 @@ public void execute(Runnable command) { TestEitherConsumer tc = new TestEitherConsumer(); - ReactiveStreamsFlowBridge.toFlow(p).subscribe(tc); + ReactiveStreamsFlowBridge.toFlowPublisher(p).subscribe(tc); p.offer(1); p.offer(2); @@ -54,7 +54,7 @@ public void execute(Runnable command) { TestEitherConsumer tc = new TestEitherConsumer(); - ReactiveStreamsFlowBridge.toFlow(p).subscribe(tc); + ReactiveStreamsFlowBridge.toFlowPublisher(p).subscribe(tc); p.offer(1); p.offer(2); @@ -77,7 +77,7 @@ public void execute(Runnable command) { TestEitherConsumer tc = new TestEitherConsumer(); - ReactiveStreamsFlowBridge.toReactiveStreams(p).subscribe(tc); + ReactiveStreamsFlowBridge.toPublisher(p).subscribe(tc); p.submit(1); p.submit(2); @@ -100,7 +100,7 @@ public void execute(Runnable command) { TestEitherConsumer tc = new TestEitherConsumer(); - ReactiveStreamsFlowBridge.toReactiveStreams(p).subscribe(tc); + ReactiveStreamsFlowBridge.toPublisher(p).subscribe(tc); p.submit(1); p.submit(2); @@ -148,7 +148,7 @@ public void cancel() { public void flowToReactiveStreamsSubscriber() { TestEitherConsumer tc = new TestEitherConsumer(); - org.reactivestreams.Subscriber fs = ReactiveStreamsFlowBridge.toReactiveStreamsSubscriber(tc); + org.reactivestreams.Subscriber fs = ReactiveStreamsFlowBridge.toSubscriber(tc); final Object[] state = { null, null }; @@ -175,4 +175,60 @@ public void cancel() { Assert.assertNull(state[1]); } + + @Test + public void stableConversionForSubscriber() { + Subscriber rsSub = new Subscriber() { + @Override public void onSubscribe(Subscription s) {}; + @Override public void onNext(Integer i) {}; + @Override public void onError(Throwable t) {}; + @Override public void onComplete() {}; + }; + + Flow.Subscriber fSub = new Flow.Subscriber() { + @Override public void onSubscribe(Flow.Subscription s) {}; + @Override public void onNext(Integer i) {}; + @Override public void onError(Throwable t) {}; + @Override public void onComplete() {}; + }; + + Assert.assertSame(ReactiveStreamsFlowBridge.toSubscriber(ReactiveStreamsFlowBridge.toFlowSubscriber(rsSub)), rsSub); + Assert.assertSame(ReactiveStreamsFlowBridge.toFlowSubscriber(ReactiveStreamsFlowBridge.toSubscriber(fSub)), fSub); + } + + @Test + public void stableConversionForProcessor() { + Processor rsPro = new Processor() { + @Override public void onSubscribe(Subscription s) {}; + @Override public void onNext(Integer i) {}; + @Override public void onError(Throwable t) {}; + @Override public void onComplete() {}; + @Override public void subscribe(Subscriber s) {}; + }; + + Flow.Processor fPro = new Flow.Processor() { + @Override public void onSubscribe(Flow.Subscription s) {}; + @Override public void onNext(Integer i) {}; + @Override public void onError(Throwable t) {}; + @Override public void onComplete() {}; + @Override public void subscribe(Flow.Subscriber s) {}; + }; + + Assert.assertSame(ReactiveStreamsFlowBridge.toProcessor(ReactiveStreamsFlowBridge.toFlowProcessor(rsPro)), rsPro); + Assert.assertSame(ReactiveStreamsFlowBridge.toFlowProcessor(ReactiveStreamsFlowBridge.toProcessor(fPro)), fPro); + } + + @Test + public void stableConversionForPublisher() { + Publisher rsPub = new Publisher() { + @Override public void subscribe(Subscriber s) {}; + }; + + Flow.Publisher fPub = new Flow.Publisher() { + @Override public void subscribe(Flow.Subscriber s) {}; + }; + + Assert.assertSame(ReactiveStreamsFlowBridge.toPublisher(ReactiveStreamsFlowBridge.toFlowPublisher(rsPub)), rsPub); + Assert.assertSame(ReactiveStreamsFlowBridge.toFlowPublisher(ReactiveStreamsFlowBridge.toPublisher(fPub)), fPub); + } } diff --git a/flow-bridge/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java b/flow-bridge/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java index cc342924..2c5b3a0d 100644 --- a/flow-bridge/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java +++ b/flow-bridge/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java @@ -40,14 +40,14 @@ public void run() { sp.close(); } }).start(); - return ReactiveStreamsFlowBridge.toReactiveStreams(sp); + return ReactiveStreamsFlowBridge.toPublisher(sp); } @Override public Publisher createFailedPublisher() { final SubmissionPublisher sp = new SubmissionPublisher(); sp.closeExceptionally(new IOException()); - return ReactiveStreamsFlowBridge.toReactiveStreams(sp); + return ReactiveStreamsFlowBridge.toPublisher(sp); } @Override diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 2c6137b8..3030cc76 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index aeb99ed9..ff8278a9 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,4 +1,4 @@ -#Wed Jul 12 20:56:16 CEST 2017 +#Sun Oct 29 16:12:50 JST 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index 9d82f789..cccdd3d5 100755 --- a/gradlew +++ b/gradlew @@ -1,4 +1,4 @@ -#!/usr/bin/env bash +#!/usr/bin/env sh ############################################################################## ## @@ -6,20 +6,38 @@ ## ############################################################################## -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS="" +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null APP_NAME="Gradle" APP_BASE_NAME=`basename "$0"` +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" -warn ( ) { +warn () { echo "$*" } -die ( ) { +die () { echo echo "$*" echo @@ -30,6 +48,7 @@ die ( ) { cygwin=false msys=false darwin=false +nonstop=false case "`uname`" in CYGWIN* ) cygwin=true @@ -40,26 +59,11 @@ case "`uname`" in MINGW* ) msys=true ;; + NONSTOP* ) + nonstop=true + ;; esac -# Attempt to set APP_HOME -# Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi -done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >/dev/null -APP_HOME="`pwd -P`" -cd "$SAVED" >/dev/null - CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar # Determine the Java command to use to start the JVM. @@ -85,7 +89,7 @@ location of your Java installation." fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then MAX_FD_LIMIT=`ulimit -H -n` if [ $? -eq 0 ] ; then if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then @@ -150,11 +154,19 @@ if $cygwin ; then esac fi -# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules -function splitJvmOpts() { - JVM_OPTS=("$@") +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " } -eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS -JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi -exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index 5f192121..e95643d6 100755 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,90 +1,84 @@ -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto init - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:init -@rem Get command-line arguments, handling Windows variants - -if not "%OS%" == "Windows_NT" goto win9xME_args -if "%@eval[2+2]" == "4" goto 4NT_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* -goto execute - -:4NT_args -@rem Get arguments from the 4NT Shell from JP Software -set CMD_LINE_ARGS=%$ - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/settings.gradle b/settings.gradle index 01529608..9272bcae 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,14 +1,24 @@ rootProject.name = 'reactive-streams' -def jdkFlow = false; +def jdkFlow = false + +final def ANSI_RESET = "\u001B[0m" +final def ANSI_RED = "\u001B[31m" +final def ANSI_GREEN = "\u001B[32m" +final def ANSI_YELLOW = "\u001B[33m" try { - Class.forName("java.util.concurrent.Flow"); - jdkFlow = true; - println("Java 9 Flow API found") + Class.forName("java.util.concurrent.Flow") + jdkFlow = true + println(ANSI_GREEN + " INFO: ------------------ JDK9 classes detected ---------------------------------" + ANSI_RESET) + println(ANSI_GREEN + " INFO: Java 9 Flow API found; Including [flow-bridge, tck-flow] in build. " + ANSI_RESET) + println(ANSI_GREEN + " INFO: --------------------------------------------------------------------------" + ANSI_RESET) } catch (Throwable ex) { // Flow API not available - println("Java 9 Flow API not available") + println(ANSI_RED + "WARNING: ------------------ JDK9 classes NOT detected -----------------------------" + ANSI_RESET) + println(ANSI_RED + "WARNING: Java 9 Flow API not found; Not including [flow-bridge, tck-flow] in build." + ANSI_RESET) + println(ANSI_RED + "WARNING: In order to execute the complete test-suite run the build using JDK9+. " + ANSI_RESET) + println(ANSI_RED + "WARNING: --------------------------------------------------------------------------" + ANSI_RESET) } include ':reactive-streams' @@ -17,6 +27,7 @@ include ':reactive-streams-examples' if (jdkFlow) { include ':reactive-streams-flow-bridge' + include ':reactive-streams-tck-flow' } project(':reactive-streams').projectDir = "$rootDir/api" as File @@ -24,4 +35,5 @@ project(':reactive-streams-tck').projectDir = "$rootDir/tck" as File project(':reactive-streams-examples').projectDir = "$rootDir/examples" as File if (jdkFlow) { project(':reactive-streams-flow-bridge').projectDir = "$rootDir/flow-bridge" as File + project(':reactive-streams-tck-flow').projectDir = "$rootDir/tck-flow" as File } diff --git a/tck-flow/README.md b/tck-flow/README.md new file mode 100644 index 00000000..03a93a88 --- /dev/null +++ b/tck-flow/README.md @@ -0,0 +1,616 @@ +# Reactive Streams TCK for `java.util.concurrent.Flow.*` # + +The purpose of the *Reactive Streams Technology Compatibility Kit* (from here on referred to as: *the TCK*) is to guide +and help Reactive Streams library implementers to validate their implementations against the rules defined in [the Specification](https://github.com/reactive-streams/reactive-streams-jvm). + +Since this version of the TCK is intended to verify the interfaces contained in Java 9 (under `java.util.concurrent.Flow.*`), at least Java `9` is required to run this TCK. If you're looking for the previous TCK that was intended for Reactive Streams prior to their inclusion in the JDK please look at [] + +## Structure of the TCK + +The TCK aims to cover all rules defined in the Specification, however for some rules outlined in the Specification it is +not possible (or viable) to construct automated tests, thus the TCK can not claim to fully verify an implementation, however it is very helpful and is able to validate the most important rules. + +The TCK is split up into 4 TestNG test classes which are to be extended by implementers, providing their `Flow.Publisher` / `Flow.Subscriber` / `Flow.Processor` implementations for the test harness to validate. + +The tests are split in the following way: + +* `FlowPublisherVerification` +* `FlowSubscriberWhiteboxVerification` +* `FlowSubscriberBlackboxVerification` +* `IdentityFlowProcessorVerification` + +The sections below include examples on how these can be used and describe the various configuration options. + +The TCK is provided as binary artifact on [Maven Central](http://search.maven.org/#search|ga|1|reactive-streams-tck): + +```xml + + org.reactivestreams + reactive-streams-tck-flow + 1.0.1 + test + +``` + +Please refer to the [Reactive Streams Specification](https://github.com/reactive-streams/reactive-streams-jvm) for the current latest version number. Make sure that your Reactive Streams API and TCK dependency versions match. + +### Test method naming convention + +Since the TCK is aimed at Reactive Stream implementers, looking into the sources of the TCK is well expected and encouraged as it should help during the implementation cycle. + +In order to make mapping between test cases and Specification rules easier, each test case covering a specific +Specification rule abides the following naming convention: `TYPE_spec###_DESC` where: + +* `TYPE` is one of: [#type-required](required), [#type-optional](optional), [#type-stochastic](stochastic) or [#type-untested](untested) which describe if this test is covering a Rule that MUST or SHOULD be implemented. The specific words are explained in detail below. +* `###` is the Rule number (`1.xx` Rules are about `Publisher`s, `2.xx` Rules are about Subscribers etc.) +* `DESC` is a short explanation of what exactly is being tested in this test case, as sometimes one Rule may have multiple test cases in order to cover the entire Rule. + +Here is an example test method signature: + +```java + // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.1 + @Test public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable { + // ... + } +``` + +#### Test types explained: + +```java +@Test public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable +``` + + +The `required_` means that this test case is a hard requirement, it covers a *MUST* or *MUST NOT* Rule of the Specification. + + +```java +@Test public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable +``` + + +The `optional_` means that this test case is an optional requirement, it covers a *MAY* or *SHOULD* Rule of the Specification. +This prefix is also used if more configuration is needed in order to run it, e.g. +`@Additional(implement = "createFailedPublisher") @Test` signals the implementer that in order to run this test +one has to implement the `Publisher createFailedPublisher()` method. + +```java +@Test public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable +``` + + +The `stochastic_` means that the Rule is impossible or infeasible to deterministically verify— +usually this means that this test case can yield false positives ("be green") even if for some case, the given implementation may violate the tested behaviour. + +```java +@Test public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable +``` + + +The `untested_` means that the test case is not implemented, either because it is inherently hard to verify (e.g. Rules which use +the wording "*SHOULD consider X as Y*"). Such tests will show up in your test runs as `SKIPPED`, with a message pointing out that the TCK is unable to validate this Rule. Solutions to deterministically test Rules which have been +marked with this prefix are most welcome – pull requests are encouraged! + +### Test isolation + +All test assertions are isolated within the required `TestEnvironment`, so it is safe to run the TCK tests in parallel. + +### Testing Publishers with restricted capabilities + +Some `Publisher`s will not be able to pass through all TCK tests due to some internal or fundamental decisions in their design. +For example, a `FuturePublisher` can be implemented such that it can only ever `onNext` **exactly once**—this means that it's not possible to run all TCK tests against it since some of them require multiple elements to be emitted. + +In order to allow such `Publisher`s to be tested against the spec's rules, the TCK provides the `maxElementsFromPublisher()` method as means of communicating the limited capabilities of the Publisher. For example, if a `Publisher` can only ever emit up to `2` elements, +tests in the TCK which require more than 2 elements to verify a rule will be skipped. + +In order to inform the TCK that the `Publisher` is only able to signal up to `2` elements, override the `maxElementsFromPublisher` method like this: + +```java +@Override public long maxElementsFromPublisher() { + return 2; +} +``` + +The TCK also supports `Publisher`s which are not able to signal completion. Imagine a `Publisher` being +backed by a timer—such a `Publisher` does not have a natural way to "complete" after some number of ticks. It would be +possible to implement a `Processor` which would "take n elements from the TickPublisher and then signal completion to the +downstream", but this adds a layer of indirection between the TCK and the `Publisher` one initially wanted to test. +It is suggested to test such unbouded `Publisher`s either way—using a "TakeNElementsProcessor" or by informing the TCK +that the `Publisher` is not able to signal completion. The TCK will then skip all tests which require `onComplete` signals to be emitted. + +In order to inform the TCK that your Publiher is not able to signal completion, override the `maxElementsFromPublisher` method like this: + +```java +@Override public long maxElementsFromPublisher() { + return publisherUnableToSignalOnComplete(); // == Long.MAX_VALUE == unbounded +} +``` + +### Testing a "failed" Publisher +The Reactive Streams Specification mandates certain behaviours for `Publisher`s which are "failed", +e.g. it was unable to initialize a connection it needs to emit elements. +It may be useful to specifically such known to be failed `Publisher` using the TCK. + +In order to run additional tests on a failed `Publisher` implement the `createFailedPublisher` method. +The expected behaviour from the returned implementation is to follow Rule 1.4 and Rule 1.9—which are concerned +with the order of emiting the `Subscription` and signaling the failure. + +```java +@Override public Flow.Publisher createFailedPublisher() { + final String invalidData = "this input string is known it to be failed"; + return new MyPublisher(invalidData); +} +``` + +In case there isn't a known up-front error state to put the `Publisher` into, +ignore these tests by returning `null` from the `createFailedPublisher` method. +It is important to remember that it is **illegal** to signal `onNext / onComplete / onError` before signalling the `Subscription` through `onSubscribe`, for details on this rule refer to the Reactive Streams specification. + +## Publisher Verification + +`FlowPublisherVerification` tests verify `Publisher` as well as some `Subscription` Rules of the Specification. + +In order to include it's tests in your test suite simply extend it, like this: + +```java +package com.example.streams; + +import org.reactivestreams.tck.flow.FlowPublisherVerification; +import org.reactivestreams.tck.TestEnvironment; + +import java.util.concurrent.Flow; + +public class RangePublisherTest extends FlowPublisherVerification { + + public RangePublisherTest() { + super(new TestEnvironment()); + } + + @Override + public Flow.Publisher createPublisher(long elements) { + return new RangePublisher(1, elements); + } + + @Override + public Flow.Publisher createFailedPublisher() { + return new Publisher() { + @Override + public void subscribe(Subscriber s) { + s.onError(new RuntimeException("Can't subscribe subscriber: " + s + ", because of reasons.")); + } + }; + } + + // ADDITIONAL CONFIGURATION + + @Override + public long maxElementsFromPublisher() { + return Long.MAX_VALUE - 1; + } + + @Override + public long boundedDepthOfOnNextAndRequestRecursion() { + return 1; + } +} +``` + +Notable configuration options include: + +* `maxElementsFromPublisher` – must be overridden in case the `Publisher` being tested is of bounded length, e.g. it's wrapping a `Future` and thus can only publish up to 1 element, in which case you + would return `1` from this method. It will cause all tests which require more elements in order to validate a certain + Rule to be skipped, +* `boundedDepthOfOnNextAndRequestRecursion` – which must be overridden when verifying synchronous `Publisher`s. + This number returned by this method will be used to validate if a `Subscription` adheres to Rule 3.3 and avoids "unbounded recursion". + +### Timeout configuration +Publisher tests make use of two kinds of timeouts, one is the `defaultTimeoutMillis` which corresponds to all methods used +within the TCK which await for something to happen. The other timeout is `publisherReferenceGCTimeoutMillis` which is only used in order to verify +[Rule 3.13](https://github.com/reactive-streams/reactive-streams-jvm#3.13) which defines that `Subscriber` references MUST be dropped +by the Publisher. + +Note that the TCK differenciates between timeouts for "waiting for a signal" (``defaultTimeoutMillis``), +and "asserting no signals happen during a given amount of time" (``envDefaultNoSignalsTimeoutMillis``). +While the latter defaults to the prior, it may be useful to tweak them independently when running on continious +integration servers (for example, keeping the no-signals timeout significantly lower). + +In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either: + +**Use env variables** to set these timeouts, in which case the you can do: + +```bash +export DEFAULT_TIMEOUT_MILLIS=100 +export DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS=100 +export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300 +``` + +Or **define the timeouts explicitly in code**: + +```java +public class RangePublisherTest extends FlowPublisherVerification { + + public static final long DEFAULT_TIMEOUT_MILLIS = 100L; + public static final long DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS = DEFAULT_TIMEOUT_MILLIS; + public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L; + + public RangePublisherTest() { + super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS); + } + + // ... +} +``` + +Note that explicitly passed in values take precedence over values provided by the environment + +## Subscriber Verification + +`Subscriber` Verification is split up into two files (styles) of tests. + +It is highly recommended to implement the `FlowSubscriberWhiteboxVerification` instead of the `FlowSubscriberBlackboxVerification` even if it is more work to do so, as it can test far more rules and corner cases in implementations that would otherwise be left untested—which is the case when using the Blackbox Verification. + +### createElement and Helper Publisher implementations +Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK `Subscriber` Verifications +both provide a default "*helper publisher*" to drive its tests and also allow to replace this `Publisher` with a custom implementation. +The helper `Publisher` is an asynchronous `Publisher` by default—meaning that a `Subscriber` can not blindly assume single threaded execution. + +When extending `Subscriber` Verification classes a type parameter representing the element type passed through the stream must be given. +Implementations are typically not sensitive to the type of element being signalled, but sometimes a `Subscriber` may be limited to only be able to work within a known set of types - +like a `FileSubscriber extends Flow.Subscriber` for example, that writes each element (ByteBuffer) it receives into a file. +For element type agnostic Subscribers the simplest way is to parameterize the tests using `Integer` and in the `createElement(int idx)` method (explained below in futher detail), return the incoming `int`. +In case an implementation needs to work on a specific type, the verification class should be parameterized using that type (e.g. `class StringSubTest extends FlowSubscriberWhiteboxVerification`) and the `createElement` method must be overriden to return a `String`. + +While the Helper `Publisher` implementation is provided, creating its elements is not – this is because a given `Subscriber` +may for example only work with `HashedMessage` or some other specific kind of element. The TCK is unable to generate such +special messages automatically, so the TCK provides the `T createElement(Integer id)` method to be implemented as part of +`Subscriber` Verifications which should take the given `id` and return an element of type `T` (where `T` is the type of +elements flowing into the `Subscriber`, as known thanks to `... extends FlowSubscriberWhiteboxVerification`) representing +an element of the stream that will be passed on to the Subscriber. + +The simplest valid implemenation is to return the incoming `id` *as the element* in a verification using `Integer`s as +element types: + +```java +public class MySubscriberTest extends FlowSubscriberBlackboxVerification { + + // ... + + @Override + public Integer createElement(int element) { return element; } +} +``` + + +NOTE: The `createElement` method *MAY* be called *concurrently from multiple threads*. + +**Very advanced**: While it is not expected for many implementations having to do so, it is possible to take full control of the `Publisher` which will be driving the TCKs test. This can be achieved by implementing the `createHelperPublisher` method in which one can implement the `createHelperPublisher` method by returning a custom `Publisher` implementation which will then be used by the TCK to drive your `Subscriber` tests: + +```java +@Override public Flow.Publisher createHelperPublisher(long elements) { + return new Flow.Publisher() { /* CUSTOM IMPL HERE WHICH OF COURSE ALSO SHOULD PASS THE TCK */ }; +} +``` + + +### Subscriber Whitebox Verification + +The Whitebox Verification is able to verify most of the `Subscriber` Specification, at the additional cost that control over demand generation and cancellation must be handed over to the TCK via the `SubscriberPuppet`. + +Based on experience implementing the `SubscriberPuppet`—it can be tricky or even impossible for some implementations, +as such, not all implementations are expected to make use of the plain `FlowSubscriberWhiteboxVerification`, instead having to fall back to using the `FlowSubscriberBlackboxVerification`. + +For the simplest possible (and most common) `Subscriber` implementation using the whitebox verification boils down to +exteding (or delegating to) your implementation with additionally signalling and registering the test probe, as shown in the below example: + +```java +package com.example.streams; + +import org.reactivestreams.tck.flow.FlowSubscriberWhiteboxVerification; +import org.reactivestreams.tck.TestEnvironment; + +import java.util.concurrent.Flow; + +public class MyFlowSubscriberWhiteboxVerificationTest extends FlowSubscriberWhiteboxVerification { + + public MyFlowSubscriberWhiteboxVerificationTest() { + super(new TestEnvironment()); + } + + // The implementation under test is "SyncSubscriber": + // class SyncSubscriber extends Flow.Subscriber { /* ... */ } + + @Override + public Flow.Subscriber createSubscriber(final WhiteboxSubscriberProbe probe) { + // in order to test the SyncSubscriber we must instrument it by extending it, + // and calling the WhiteboxSubscriberProbe in all of the Subscribers methods: + return new SyncSubscriber() { + @Override + public void onSubscribe(final Flow.Subscription s) { + super.onSubscribe(s); + + // register a successful Subscription, and create a Puppet, + // for the WhiteboxVerification to be able to drive its tests: + probe.registerOnSubscribe(new SubscriberPuppet() { + + @Override + public void triggerRequest(long elements) { + s.request(elements); + } + + @Override + public void signalCancel() { + s.cancel(); + } + }); + } + + @Override + public void onNext(Integer element) { + // in addition to normal Subscriber work that you're testing, register onNext with the probe + super.onNext(element); + probe.registerOnNext(element); + } + + @Override + public void onError(Throwable cause) { + // in addition to normal Subscriber work that you're testing, register onError with the probe + super.onError(cause); + probe.registerOnError(cause); + } + + @Override + public void onComplete() { + // in addition to normal Subscriber work that you're testing, register onComplete with the probe + super.onComplete(); + probe.registerOnComplete(); + } + }; + } + + @Override + public Integer createElement(int element) { + return element; + } + +} +``` + +### Subscriber Blackbox Verification + +Blackbox Verification does not require anything besides providing a `Subscriber` and `Publisher` instances to the TCK, +at the expense of not being able to verify as much as the `FlowSubscriberWhiteboxVerification`: + +```java +package com.example.streams; + +import org.reactivestreams.tck.flow.FlowSubscriberBlackboxVerification; +import org.reactivestreams.tck.TestEnvironment; + +import java.util.concurrent.Flow; + +public class MyFlowSubscriberBlackboxVerificationTest extends FlowSubscriberBlackboxVerification { + + public MyFlowSubscriberBlackboxVerificationTest() { + super(new TestEnvironment()); + } + + @Override + public Flow.Subscriber createSubscriber() { + return new MySubscriber(); + } + + @Override + public Integer createElement(int element) { + return element; + } +} +``` + +### Timeout configuration +Similarily to `FlowPublisherVerification`, it is possible to set the timeouts used by the TCK to validate `Subscriber` behaviour either hard-coded or by using environment variables. + +**Use env variables** to set the timeout value to be used by the TCK: + +```bash +export DEFAULT_TIMEOUT_MILLIS=300 +``` + +Or **define the timeout explicitly in code**: + +```java +public class MySubscriberTest extends FlowSubscriberBlackboxVerification { + + public static final long DEFAULT_TIMEOUT_MILLIS = 300L; + + public RangePublisherTest() { + super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS)); + } + + // ... +} +``` + +NOTE: hard-coded values *take precedence* over environment set values (!). + + +## Subscription Verification + +Please note that while `Subscription` does **not** have it's own test class, it's rules are validated inside of the +`Publisher` and `Subscriber` tests – depending if the Rule demands specific action to be taken by the publishing, or +subscribing side of the `Subscription` contract. + +## Identity Processor Verification + +An `IdentityFlowProcessorVerification` tests the given `Processor` for all `Subscriber`, `Publisher` as well as +`Subscription` rules (internally the `WhiteboxSubscriberVerification` is used for that). + +```java +package com.example.streams; + +import org.reactivestreams.tck.flow.IdentityFlowProcessorVerification; +import org.reactivestreams.tck.flow.FlowSubscriberWhiteboxVerification; +import org.reactivestreams.tck.TestEnvironment; + +import java.util.concurrent.Flow; + +public class MyIdentityFlowProcessorVerificationTest extends IdentityFlowProcessorVerification { + + public static final long DEFAULT_TIMEOUT_MILLIS = 300L; + public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 1000L; + + + public MyIdentityFlowProcessorVerificationTest() { + super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS); + } + + @Override + public Flow.Processor createIdentityProcessor(int bufferSize) { + return new MyIdentityProcessor(bufferSize); + } + + @Override + public Flow.Publisher createHelperPublisher(long elements) { + return new MyRangePublisher(1, elements); + } + + // ENABLE ADDITIONAL TESTS + + @Override + public Flow.Publisher createFailedPublisher() { + // return Publisher that only signals onError instead of null to run additional tests + // see this methods JavaDocs for more details on how the returned Publisher should work. + return null; + } + + // OPTIONAL CONFIGURATION OVERRIDES + // only override these if understanding the implications of doing so. + + @Override + public long maxElementsFromPublisher() { + return super.maxElementsFromPublisher(); + } + + @Override + public long boundedDepthOfOnNextAndRequestRecursion() { + return super.boundedDepthOfOnNextAndRequestRecursion(); + } +} +``` + +The additional configuration options reflect the options available in the `Subscriber` and `Publisher` Verifications. + +The `IdentityFlowProcessorVerification` also runs additional "sanity" verifications, which are not directly mapped to +Specification rules, but help to verify that a `Processor` won't "get stuck" or face similar problems. Please refer to the +sources for details on the tests included. + +## Ignoring tests +Since the tests are inherited instead of user defined it's not possible to use the usual `@Ignore` annotations +to skip certain tests (which may be perfectly reasonable if the implementation has some know constraints on what it +cannot implement). Below is a recommended pattern to skip tests inherited from the TCK's base classes: + +```java +package com.example.streams; + +import org.reactivestreams.tck.flow.IdentityFlowProcessorVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import java.util.concurrent.Flow; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class MyIdentityProcessorTest extends IdentityFlowProcessorVerification { + + private ExecutorService e; + + @BeforeClass + public void before() { e = Executors.newFixedThreadPool(4); } + + @AfterClass + public void after() { if (e != null) e.shutdown(); } + + public SkippingIdentityProcessorTest() { + super(new TestEnvironment()); + } + + @Override + public ExecutorService publisherExecutorService() { + return e; + } + + @Override + public Integer createElement(int element) { + return element; + } + + @Override + public Flow.Processor createIdentityProcessor(int bufferSize) { + return new MyProcessor(bufferSize); // return implementation to be tested + } + + @Override + public Flow.Publisher createFailedPublisher() { + return null; // returning null means that the tests validating a failed publisher will be skipped + } + +} +``` + +## Which verifications must be implemented by a compliant implementation? +In order to be considered an Reactive Streams compliant require implementations to cover their +`Publisher`s and `Subscriber`s with TCK verifications. If a library only implements `Subscriber`s, it does not have to implement `Publisher` tests, the same applies to `IdentityFlowProcessorVerification`-it is not needed if the library does not contain `Processor`s. + +In the case of `Subscriber` Verification are two styles of verifications to available: Blackbox or Whitebox. +It is *strongly* recommend to test `Subscriber` implementations with the `FlowSubscriberWhiteboxVerification` as it is able to +verify most of the specification. The `FlowSubscriberBlackboxVerification` should only be used as a fallback, +once it's certain that implementing the whitebox version will not be possible—if that happens +feel free to open a ticket on the [reactive-streams-jvm](https://github.com/reactive-streams/reactive-streams-jvm) project explaining what made implementing the whitebox verification impossible. + +In summary: implementations are required to use Verifications for the parts of the Specification that they implement, +and encouraged to using the Whitebox Verification over Blackbox for `Subscriber` whenever possible. + +## Upgrading the TCK to newer versions +While it's not expected for the Reactive Streams Specification to change in the forseeable future, +it *may be* that some semantics may need to change at some point. In this case it should expected for test +methods being phased out in terms of deprecation or removal, new tests may also be added over time. + +In general this should not be of much concern, unless overriding test methods are overriden by implementers. +Implementers who find the need of overriding provided test methods are encouraged to reach out via opening Issues +on the [Reactive Streams](https://github.com/reactive-streams/reactive-streams-jvm) project, so the use case can be discussed and, most likely, the TCK improved. + +## Using the TCK from other programming languages + +The TCK was designed such that it should be possible to consume it using different JVM-based programming languages. +The section below shows how to use the TCK using different languages (contributions of examples for more languages are very welcome): + +### Scala + +In order to run the TCK using [ScalaTest](http://www.scalatest.org/) the test class must mix-in the `TestNGSuiteLike` trait (as of ScalaTest `2.2.x`). + +```scala +class IterablePublisherTest(env: TestEnvironment, publisherShutdownTimeout: Long) + extends FlowPublisherVerification[Int](env, publisherShutdownTimeout) + with TestNGSuiteLike { + + def this() { + this(new TestEnvironment(500), 1000) + } + + def createFlowPublisher(elements: Long): Flow.Publisher[Int] = ??? + + // example error state publisher implementation + override def createFailedFlowPublisher(): Flow.Publisher[Int] = + new Flow.Publisher[Int] { + override def subscribe(s: Flow.Subscriber[Int]): Unit = + s.onError(new Exception("Unable to serve subscribers right now!")) + } + +} +``` + +### Groovy, JRuby, Kotlin, others... + +Contributions to this document are very welcome! + +When implementing Reactive Streams using the TCK in some yet undocumented here, language, please feel free to share an example! diff --git a/tck-flow/build.gradle b/tck-flow/build.gradle new file mode 100644 index 00000000..17f93dc0 --- /dev/null +++ b/tck-flow/build.gradle @@ -0,0 +1,7 @@ +description = 'reactive-streams-tck-flow' +dependencies { + compile group: 'org.testng', name: 'testng', version:'5.14.10' + compile project(':reactive-streams-tck') + compile project(':reactive-streams-flow-bridge') +} +test.useTestNG() diff --git a/tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowPublisherVerification.java b/tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowPublisherVerification.java new file mode 100644 index 00000000..fbfdd672 --- /dev/null +++ b/tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowPublisherVerification.java @@ -0,0 +1,63 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import org.reactivestreams.Publisher; +import org.reactivestreams.ReactiveStreamsFlowBridge; +import org.reactivestreams.tck.PublisherVerification; +import org.reactivestreams.tck.TestEnvironment; + +import java.util.concurrent.Flow; + +/** + * Provides tests for verifying a Java 9+ {@link java.util.concurrent.Flow.Publisher} specification rules. + * + * @see java.util.concurrent.Flow.Publisher + */ +public abstract class FlowPublisherVerification extends PublisherVerification { + + public FlowPublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) { + super(env, publisherReferenceGCTimeoutMillis); + } + + public FlowPublisherVerification(TestEnvironment env) { + super(env); + } + + @Override + final public Publisher createPublisher(long elements) { + final Flow.Publisher flowPublisher = createFlowPublisher(elements); + return ReactiveStreamsFlowBridge.toPublisher(flowPublisher); + } + /** + * This is the main method you must implement in your test incarnation. + * It must create a Publisher for a stream with exactly the given number of elements. + * If `elements` is `Long.MAX_VALUE` the produced stream must be infinite. + */ + public abstract Flow.Publisher createFlowPublisher(long elements); + + @Override + final public Publisher createFailedPublisher() { + final Flow.Publisher failed = createFailedFlowPublisher(); + if (failed == null) return null; // because `null` means "SKIP" in createFailedPublisher + else return ReactiveStreamsFlowBridge.toPublisher(failed); + } + /** + * By implementing this method, additional TCK tests concerning a "failed" publishers will be run. + * + * The expected behaviour of the {@link Flow.Publisher} returned by this method is hand out a subscription, + * followed by signalling {@code onError} on it, as specified by Rule 1.9. + * + * If you ignore these additional tests, return {@code null} from this method. + */ + public abstract Flow.Publisher createFailedFlowPublisher(); +} diff --git a/tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberBlackboxVerification.java b/tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberBlackboxVerification.java new file mode 100644 index 00000000..b9a4ca1b --- /dev/null +++ b/tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberBlackboxVerification.java @@ -0,0 +1,65 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import org.reactivestreams.ReactiveStreamsFlowBridge; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.reactivestreams.tck.SubscriberBlackboxVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.reactivestreams.tck.flow.support.SubscriberBlackboxVerificationRules; + +import java.util.concurrent.Flow; + +/** + * Provides tests for verifying {@link java.util.concurrent.Flow.Subscriber} and {@link java.util.concurrent.Flow.Subscription} + * specification rules, without any modifications to the tested implementation (also known as "Black Box" testing). + * + * This verification is NOT able to check many of the rules of the spec, and if you want more + * verification of your implementation you'll have to implement {@code org.reactivestreams.tck.SubscriberWhiteboxVerification} + * instead. + * + * @see java.util.concurrent.Flow.Subscriber + * @see java.util.concurrent.Flow.Subscription + */ +public abstract class FlowSubscriberBlackboxVerification extends SubscriberBlackboxVerification + implements SubscriberBlackboxVerificationRules { + + protected FlowSubscriberBlackboxVerification(TestEnvironment env) { + super(env); + } + + @Override + public final void triggerRequest(Subscriber subscriber) { + triggerFlowRequest(ReactiveStreamsFlowBridge.toFlowSubscriber(subscriber)); + } + /** + * Override this method if the {@link java.util.concurrent.Flow.Subscriber} implementation you are verifying + * needs an external signal before it signals demand to its Publisher. + * + * By default this method does nothing. + */ + public void triggerFlowRequest(Flow.Subscriber subscriber) { + // this method is intentionally left blank + } + + @Override + public final Subscriber createSubscriber() { + return ReactiveStreamsFlowBridge.toSubscriber(createFlowSubscriber()); + } + /** + * This is the main method you must implement in your test incarnation. + * It must create a new {@link Flow.Subscriber} instance to be subjected to the testing logic. + */ + abstract public Flow.Subscriber createFlowSubscriber(); + +} diff --git a/tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberWhiteboxVerification.java b/tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberWhiteboxVerification.java new file mode 100644 index 00000000..23a74bd4 --- /dev/null +++ b/tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberWhiteboxVerification.java @@ -0,0 +1,48 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import org.reactivestreams.ReactiveStreamsFlowBridge; +import org.reactivestreams.Subscriber; +import org.reactivestreams.tck.SubscriberWhiteboxVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; + +import java.util.concurrent.Flow; + +/** + * Provides whitebox style tests for verifying {@link java.util.concurrent.Flow.Subscriber} + * and {@link java.util.concurrent.Flow.Subscription} specification rules. + * + * @see java.util.concurrent.Flow.Subscriber + * @see java.util.concurrent.Flow.Subscription + */ +public abstract class FlowSubscriberWhiteboxVerification extends SubscriberWhiteboxVerification + implements SubscriberWhiteboxVerificationRules { + + protected FlowSubscriberWhiteboxVerification(TestEnvironment env) { + super(env); + } + + @Override + final public Subscriber createSubscriber(WhiteboxSubscriberProbe probe) { + return ReactiveStreamsFlowBridge.toSubscriber(createFlowSubscriber(probe)); + } + /** + * This is the main method you must implement in your test incarnation. + * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. + * + * In order to be meaningfully testable your Subscriber must inform the given + * `WhiteboxSubscriberProbe` of the respective events having been received. + */ + protected abstract Flow.Subscriber createFlowSubscriber(WhiteboxSubscriberProbe probe); +} diff --git a/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java b/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java new file mode 100644 index 00000000..4e899afe --- /dev/null +++ b/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java @@ -0,0 +1,72 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.tck.IdentityProcessorVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; +import org.reactivestreams.tck.flow.support.PublisherVerificationRules; + +public abstract class IdentityFlowProcessorVerification extends IdentityProcessorVerification + implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { + + public IdentityFlowProcessorVerification(TestEnvironment env) { + super(env); + } + + public IdentityFlowProcessorVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) { + super(env, publisherReferenceGCTimeoutMillis); + } + + public IdentityFlowProcessorVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) { + super(env, publisherReferenceGCTimeoutMillis, processorBufferSize); + } + + protected abstract Publisher createFailedFlowPublisher(); + + protected abstract Processor createIdentityFlowProcessor(int bufferSize); + + protected abstract Subscriber createFlowSubscriber(FlowSubscriberWhiteboxVerification.WhiteboxSubscriberProbe probe); + + protected abstract Publisher createFlowHelperPublisher(long elements); + + protected abstract Publisher createFlowPublisher(long elements); + + @Override + public final Publisher createHelperPublisher(long elements) { + return createFlowHelperPublisher(elements); + } + + @Override + public final Processor createIdentityProcessor(int bufferSize) { + return createIdentityFlowProcessor(bufferSize); + } + + @Override + public final Publisher createFailedPublisher() { + return createFailedFlowPublisher(); + } + + @Override + public final Publisher createPublisher(long elements) { + return createFlowPublisher(elements); + } + + @Override + public final Subscriber createSubscriber(FlowSubscriberWhiteboxVerification.WhiteboxSubscriberProbe probe) { + return createFlowSubscriber(probe); + } + +} diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/EmptyLazyFlowPublisherTest.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/EmptyLazyFlowPublisherTest.java new file mode 100644 index 00000000..62b1fc24 --- /dev/null +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/EmptyLazyFlowPublisherTest.java @@ -0,0 +1,58 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import org.reactivestreams.ReactiveStreamsFlowBridge; +import org.reactivestreams.example.unicast.AsyncIterablePublisher; +import java.util.concurrent.Flow.Publisher; + +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Test +public class EmptyLazyFlowPublisherTest extends FlowPublisherVerification { + + private ExecutorService ex; + + public EmptyLazyFlowPublisherTest() { + super(new TestEnvironment()); + } + + @BeforeClass + void before() { ex = Executors.newFixedThreadPool(4); } + + @AfterClass + void after() { if (ex != null) ex.shutdown(); } + + @Override + public Publisher createFlowPublisher(long elements) { + return ReactiveStreamsFlowBridge.toFlowPublisher( + new AsyncIterablePublisher(Collections.emptyList(), ex) + ); + } + + @Override + public Publisher createFailedFlowPublisher() { + return null; + } + + @Override + public long maxElementsFromPublisher() { + return 0; + } +} diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/RangeFlowPublisherTest.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/RangeFlowPublisherTest.java new file mode 100644 index 00000000..e92e6b0a --- /dev/null +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/RangeFlowPublisherTest.java @@ -0,0 +1,177 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.*; + +@Test +public class RangeFlowPublisherTest extends FlowPublisherVerification { + + static final Map stacks = new ConcurrentHashMap(); + + static final Map states = new ConcurrentHashMap(); + + static final AtomicInteger id = new AtomicInteger(); + + @AfterClass + public static void afterClass() { + boolean fail = false; + StringBuilder b = new StringBuilder(); + for (Map.Entry t : states.entrySet()) { + if (!t.getValue()) { + b.append("\r\n-------------------------------"); + for (Object o : stacks.get(t.getKey())) { + b.append("\r\nat ").append(o); + } + fail = true; + } + } + if (fail) { + throw new AssertionError("Cancellations were missing:" + b); + } + } + + public RangeFlowPublisherTest() { + super(new TestEnvironment()); + } + + @Override + public Flow.Publisher createFlowPublisher(long elements) { + return new RangeFlowPublisher(1, elements); + } + + @Override + public Flow.Publisher createFailedFlowPublisher() { + return null; + } + + static final class RangeFlowPublisher + implements Flow.Publisher { + + final StackTraceElement[] stacktrace; + + final long start; + + final long count; + + RangeFlowPublisher(long start, long count) { + this.stacktrace = Thread.currentThread().getStackTrace(); + this.start = start; + this.count = count; + } + + @Override + public void subscribe(Flow.Subscriber s) { + if (s == null) { + throw new NullPointerException(); + } + + int ids = id.incrementAndGet(); + + RangeFlowSubscription parent = new RangeFlowSubscription(s, ids, start, start + count); + stacks.put(ids, stacktrace); + states.put(ids, false); + s.onSubscribe(parent); + } + + static final class RangeFlowSubscription extends AtomicLong implements Flow.Subscription { + + private static final long serialVersionUID = 9066221863682220604L; + + final Flow.Subscriber actual; + + final int ids; + + final long end; + + long index; + + volatile boolean cancelled; + + RangeFlowSubscription(Flow.Subscriber actual, int ids, long start, long end) { + this.actual = actual; + this.ids = ids; + this.index = start; + this.end = end; + } + + @Override + public void request(long n) { + if (!cancelled) { + if (n <= 0L) { + cancelled = true; + states.put(ids, true); + actual.onError(new IllegalArgumentException("§3.9 violated")); + return; + } + + for (;;) { + long r = get(); + long u = r + n; + if (u < 0L) { + u = Long.MAX_VALUE; + } + if (compareAndSet(r, u)) { + if (r == 0) { + break; + } + return; + } + } + + long idx = index; + long f = end; + + for (;;) { + long e = 0; + while (e != n && idx != f) { + if (cancelled) { + return; + } + + actual.onNext((int)idx); + + idx++; + e++; + } + + if (idx == f) { + if (!cancelled) { + states.put(ids, true); + actual.onComplete(); + } + return; + } + + index = idx; + n = addAndGet(-n); + if (n == 0) { + break; + } + } + } + } + + @Override + public void cancel() { + cancelled = true; + states.put(ids, true); + } + } + } +} diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/SingleElementFlowPublisherTest.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SingleElementFlowPublisherTest.java new file mode 100644 index 00000000..a58ee2ad --- /dev/null +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SingleElementFlowPublisherTest.java @@ -0,0 +1,57 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Publisher; + +import org.reactivestreams.ReactiveStreamsFlowBridge; +import org.reactivestreams.example.unicast.AsyncIterablePublisher; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Test +public class SingleElementFlowPublisherTest extends FlowPublisherVerification { + + private ExecutorService ex; + + public SingleElementFlowPublisherTest() { + super(new TestEnvironment()); + } + + @BeforeClass + void before() { ex = Executors.newFixedThreadPool(4); } + + @AfterClass + void after() { if (ex != null) ex.shutdown(); } + + @Override + public Flow.Publisher createFlowPublisher(long elements) { + return ReactiveStreamsFlowBridge.toFlowPublisher(new AsyncIterablePublisher(Collections.singleton(1), ex)); + } + + @Override + public Publisher createFailedFlowPublisher() { + return null; + } + + @Override + public long maxElementsFromPublisher() { + return 1; + } +} diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/SyncTriggeredDemandSubscriberTest.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SyncTriggeredDemandSubscriberTest.java new file mode 100644 index 00000000..e9fa620b --- /dev/null +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SyncTriggeredDemandSubscriberTest.java @@ -0,0 +1,57 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import org.reactivestreams.tck.TestEnvironment; +import org.reactivestreams.tck.flow.FlowSubscriberBlackboxVerification; +import org.reactivestreams.tck.flow.support.SyncTriggeredDemandFlowSubscriber; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; + +@Test // Must be here for TestNG to find and run this, do not remove +public class SyncTriggeredDemandSubscriberTest extends FlowSubscriberBlackboxVerification { + + private ExecutorService e; + @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (e != null) e.shutdown(); } + + public SyncTriggeredDemandSubscriberTest() { + super(new TestEnvironment()); + } + + @Override + public void triggerFlowRequest(Flow.Subscriber subscriber) { + ((SyncTriggeredDemandFlowSubscriber) subscriber).triggerDemand(1); + } + + @Override public Flow.Subscriber createFlowSubscriber() { + return new SyncTriggeredDemandFlowSubscriber() { + private long acc; + @Override protected long foreach(final Integer element) { + acc += element; + return 1; + } + + @Override public void onComplete() { + } + }; + } + + @Override public Integer createElement(int element) { + return element; + } +} diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/SyncTriggeredDemandSubscriberWhiteboxTest.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SyncTriggeredDemandSubscriberWhiteboxTest.java new file mode 100644 index 00000000..4b1d6284 --- /dev/null +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SyncTriggeredDemandSubscriberWhiteboxTest.java @@ -0,0 +1,86 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.reactivestreams.tck.TestEnvironment; +import org.reactivestreams.tck.flow.support.SyncTriggeredDemandFlowSubscriber; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; + +@Test // Must be here for TestNG to find and run this, do not remove +public class SyncTriggeredDemandSubscriberWhiteboxTest extends FlowSubscriberWhiteboxVerification { + + private ExecutorService e; + @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (e != null) e.shutdown(); } + + public SyncTriggeredDemandSubscriberWhiteboxTest() { + super(new TestEnvironment()); + } + + @Override + public Flow.Subscriber createFlowSubscriber(final WhiteboxSubscriberProbe probe) { + return new SyncTriggeredDemandFlowSubscriber() { + @Override + public void onSubscribe(final Flow.Subscription s) { + super.onSubscribe(s); + + probe.registerOnSubscribe(new SubscriberPuppet() { + @Override + public void triggerRequest(long elements) { + s.request(elements); + } + + @Override + public void signalCancel() { + s.cancel(); + } + }); + } + + @Override + public void onNext(Integer element) { + super.onNext(element); + probe.registerOnNext(element); + } + + @Override + public void onError(Throwable cause) { + super.onError(cause); + probe.registerOnError(cause); + } + + @Override + public void onComplete() { + super.onComplete(); + probe.registerOnComplete(); + } + + @Override + protected long foreach(Integer element) { + return 1; + } + }; + } + + @Override public Integer createElement(int element) { + return element; + } + +} diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/support/SyncTriggeredDemandFlowSubscriber.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/support/SyncTriggeredDemandFlowSubscriber.java new file mode 100644 index 00000000..eccd0b2a --- /dev/null +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/support/SyncTriggeredDemandFlowSubscriber.java @@ -0,0 +1,134 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow.support; + +import java.util.concurrent.Flow; + +/** + * SyncTriggeredDemandSubscriber is an implementation of Reactive Streams `Subscriber`, + * it runs synchronously (on the Publisher's thread) and requests demand triggered from + * "the outside" using its `triggerDemand` method and from "the inside" using the return + * value of its user-defined `whenNext` method which is invoked to process each element. + * + * NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden. + */ +// FIXME, depend on the reactive streams version? but that's in test scope... +public abstract class SyncTriggeredDemandFlowSubscriber implements Flow.Subscriber { + private Flow.Subscription subscription; // Obeying rule 3.1, we make this private! + private boolean done = false; + + @Override public void onSubscribe(final Flow.Subscription s) { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null` + if (s == null) throw null; + + if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully + try { + s.cancel(); // Cancel the additional subscription + } catch(final Throwable t) { + //Subscription.cancel is not allowed to throw an exception, according to rule 3.15 + (new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err); + } + } else { + // We have to assign it locally before we use it, if we want to be a synchronous `Subscriber` + // Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request` + subscription = s; + } + } + + /** + * Requests the provided number of elements from the `Subscription` of this `Subscriber`. + * NOTE: This makes no attempt at thread safety so only invoke it once from the outside to initiate the demand. + * @return `true` if successful and `false` if not (either due to no `Subscription` or due to exceptions thrown) + */ + public boolean triggerDemand(final long n) { + final Flow.Subscription s = subscription; + if (s == null) return false; + else { + try { + s.request(n); + } catch(final Throwable t) { + // Subscription.request is not allowed to throw according to rule 3.16 + (new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err); + return false; + } + return true; + } + } + + @Override public void onNext(final T element) { + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err); + } else { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null` + if (element == null) throw null; + + if (!done) { // If we aren't already done + try { + final long need = foreach(element); + if (need > 0) triggerDemand(need); + else if (need == 0) {} + else { + done(); + } + } catch (final Throwable t) { + done(); + try { + onError(t); + } catch (final Throwable t2) { + //Subscriber.onError is not allowed to throw an exception, according to rule 2.13 + (new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); + } + } + } + } + } + + // Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements + // herefor we also need to cancel our `Subscription`. + private void done() { + //On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to. + done = true; // If we `whenNext` throws an exception, let's consider ourselves done (not accepting more elements) + try { + subscription.cancel(); // Cancel the subscription + } catch(final Throwable t) { + //Subscription.cancel is not allowed to throw an exception, according to rule 3.15 + (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err); + } + } + + // This method is left as an exercise to the reader/extension point + // Don't forget to call `triggerDemand` at the end if you are interested in more data, + // a return value of < 0 indicates that the subscription should be cancelled, + // a value of 0 indicates that there is no current need, + // a value of > 0 indicates the current need. + protected abstract long foreach(final T element); + + @Override public void onError(final Throwable t) { + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err); + } else { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null` + if (t == null) throw null; + // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 + // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 + } + } + + @Override public void onComplete() { + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err); + } else { + // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 + // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 + } + } +} diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/support/TCKVerificationSupport.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/support/TCKVerificationSupport.java new file mode 100644 index 00000000..e69de29b diff --git a/tck-flow/src/test/resources/testng.yaml b/tck-flow/src/test/resources/testng.yaml new file mode 100644 index 00000000..854a0ba0 --- /dev/null +++ b/tck-flow/src/test/resources/testng.yaml @@ -0,0 +1,10 @@ +name: TCKSuite +threadCount: 1 + +tests: + - name: TCK + classes: + - org.reactivestreams.tck.IdentityProcessorVerificationDelegationTest + - org.reactivestreams.tck.PublisherVerificationTest + - org.reactivestreams.tck.SubscriberBlackboxVerificationTest + - org.reactivestreams.tck.SubscriberWhiteboxVerificationTest diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 5efe6e27..2fac7fe7 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -19,9 +19,9 @@ import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; import org.reactivestreams.tck.TestEnvironment.Promise; -import org.reactivestreams.tck.support.Function; -import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules; -import org.reactivestreams.tck.support.PublisherVerificationRules; +import org.reactivestreams.tck.flow.support.Function; +import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; +import org.reactivestreams.tck.flow.support.PublisherVerificationRules; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index eb6f9344..54badd61 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -18,9 +18,9 @@ import org.reactivestreams.tck.TestEnvironment.Latch; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; -import org.reactivestreams.tck.support.Function; -import org.reactivestreams.tck.support.Optional; -import org.reactivestreams.tck.support.PublisherVerificationRules; +import org.reactivestreams.tck.flow.support.Function; +import org.reactivestreams.tck.flow.support.Optional; +import org.reactivestreams.tck.flow.support.PublisherVerificationRules; import org.testng.SkipException; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java index d29e9bb0..8931ec93 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java @@ -16,9 +16,9 @@ import org.reactivestreams.Subscription; import org.reactivestreams.tck.TestEnvironment.ManualPublisher; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; -import org.reactivestreams.tck.support.Optional; -import org.reactivestreams.tck.support.SubscriberBlackboxVerificationRules; -import org.reactivestreams.tck.support.TestException; +import org.reactivestreams.tck.flow.support.Optional; +import org.reactivestreams.tck.flow.support.SubscriberBlackboxVerificationRules; +import org.reactivestreams.tck.flow.support.TestException; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -42,7 +42,7 @@ * @see org.reactivestreams.Subscriber * @see org.reactivestreams.Subscription */ -public abstract class SubscriberBlackboxVerification extends WithHelperPublisher +public abstract class SubscriberBlackboxVerification extends WithHelperPublisher implements SubscriberBlackboxVerificationRules { protected final TestEnvironment env; @@ -66,7 +66,7 @@ protected SubscriberBlackboxVerification(TestEnvironment env) { * By default this method does nothing. */ public void triggerRequest(final Subscriber subscriber) { - + // this method is intentionally left blank } // ENV SETUP @@ -101,14 +101,14 @@ public void run(BlackboxTestStage stage) throws InterruptedException { // should cope with up to requested number of elements for (int i = 0; i < signalsToEmit && sampleIsCancelled(stage, i, 10); i++) stage.signalNext(); - - // we complete after `signalsToEmit` (which can be less than `requested`), + + // we complete after `signalsToEmit` (which can be less than `requested`), // which is legal under https://github.com/reactive-streams/reactive-streams-jvm#1.2 stage.sendCompletion(); } - /** - * In order to allow some "skid" and not check state on each iteration, + /** + * In order to allow some "skid" and not check state on each iteration, * only check {@code stage.isCancelled} every {@code checkInterval}'th iteration. */ private boolean sampleIsCancelled(BlackboxTestStage stage, int i, int checkInterval) throws InterruptedException { @@ -284,14 +284,14 @@ public void run(BlackboxTestStage stage) throws Throwable { } }); } - + // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 @Override @Test public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { blackboxSubscriberTest(new BlackboxTestStageTestRun() { @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public void run(BlackboxTestStage stage) throws Throwable { - + stage.sub().onError(new TestException()); stage.subProxy().expectError(Throwable.class); } diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java index a4c7d7e2..a22354a4 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java @@ -15,9 +15,9 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.reactivestreams.tck.TestEnvironment.*; -import org.reactivestreams.tck.support.Optional; -import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules; -import org.reactivestreams.tck.support.TestException; +import org.reactivestreams.tck.flow.support.Optional; +import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; +import org.reactivestreams.tck.flow.support.TestException; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -30,7 +30,8 @@ import static org.testng.Assert.assertTrue; /** - * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription} specification rules. + * Provides whitebox style tests for verifying {@link org.reactivestreams.Subscriber} + * and {@link org.reactivestreams.Subscription} specification rules. * * @see org.reactivestreams.Subscriber * @see org.reactivestreams.Subscription @@ -369,7 +370,7 @@ public void run(WhiteboxTestStage stage) throws Throwable { sub.onSubscribe(null); } catch (final NullPointerException expected) { gotNPE = true; - } + } assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); env.verifyNoAsyncErrorsNoDelay(); @@ -747,7 +748,7 @@ private SubscriberPuppet puppet() { public void registerOnSubscribe(SubscriberPuppet p) { if (!puppet.isCompleted()) { puppet.complete(p); - } + } } } @@ -780,9 +781,21 @@ public interface SubscriberProbe { } + /** + * Implement this puppet in your Whitebox style tests. + * The test suite will invoke the specific trigger/signal methods requesting you to execute the specific action. + * Since this is a whitebox style test, you're allowed and expected to use knowladge about your implementation to + * make implement these calls. + */ public interface SubscriberPuppet { + /** + * Trigger {@code request(elements)} on your {@link Subscriber} + */ void triggerRequest(long elements); + /** + * Trigger {@code cancel()} on your {@link Subscriber} + */ void signalCancel(); } diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index e7474d59..1d8cf062 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -14,8 +14,8 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.reactivestreams.tck.support.SubscriberBufferOverflowException; -import org.reactivestreams.tck.support.Optional; +import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException; +import org.reactivestreams.tck.flow.support.Optional; import java.util.Collections; import java.util.LinkedList; diff --git a/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java index e23780e2..6deb052d 100644 --- a/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java +++ b/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java @@ -12,9 +12,9 @@ package org.reactivestreams.tck; import org.reactivestreams.Publisher; -import org.reactivestreams.tck.support.Function; -import org.reactivestreams.tck.support.HelperPublisher; -import org.reactivestreams.tck.support.InfiniteHelperPublisher; +import org.reactivestreams.tck.flow.support.Function; +import org.reactivestreams.tck.flow.support.HelperPublisher; +import org.reactivestreams.tck.flow.support.InfiniteHelperPublisher; import java.util.concurrent.ExecutorService; diff --git a/tck/src/main/java/org/reactivestreams/tck/support/Function.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/Function.java similarity index 94% rename from tck/src/main/java/org/reactivestreams/tck/support/Function.java rename to tck/src/main/java/org/reactivestreams/tck/flow/support/Function.java index 7aa9d26b..0b6724f9 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/Function.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/Function.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; public interface Function { public Out apply(In in) throws Throwable; diff --git a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/HelperPublisher.java similarity index 92% rename from tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java rename to tck/src/main/java/org/reactivestreams/tck/flow/support/HelperPublisher.java index 5f892885..1fd96359 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/HelperPublisher.java @@ -9,14 +9,12 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; import java.util.Collections; import java.util.Iterator; import java.util.concurrent.Executor; -import org.reactivestreams.Subscription; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Publisher; + import org.reactivestreams.example.unicast.AsyncIterablePublisher; public class HelperPublisher extends AsyncIterablePublisher { diff --git a/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/InfiniteHelperPublisher.java similarity index 97% rename from tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java rename to tck/src/main/java/org/reactivestreams/tck/flow/support/InfiniteHelperPublisher.java index 7237c0a1..93227526 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/InfiniteHelperPublisher.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; import org.reactivestreams.example.unicast.AsyncIterablePublisher; diff --git a/tck/src/main/java/org/reactivestreams/tck/support/NonFatal.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/NonFatal.java similarity index 97% rename from tck/src/main/java/org/reactivestreams/tck/support/NonFatal.java rename to tck/src/main/java/org/reactivestreams/tck/flow/support/NonFatal.java index 6b3a215f..13fbc0d3 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/NonFatal.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/NonFatal.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; /** diff --git a/tck/src/main/java/org/reactivestreams/tck/support/Optional.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/Optional.java similarity index 97% rename from tck/src/main/java/org/reactivestreams/tck/support/Optional.java rename to tck/src/main/java/org/reactivestreams/tck/flow/support/Optional.java index e533c65e..b1c53287 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/Optional.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/Optional.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; import java.util.NoSuchElementException; diff --git a/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/PublisherVerificationRules.java similarity index 99% rename from tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java rename to tck/src/main/java/org/reactivestreams/tck/flow/support/PublisherVerificationRules.java index e429ec30..7ef44b2b 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/PublisherVerificationRules.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; /** * Internal TCK use only. diff --git a/tck/src/main/java/org/reactivestreams/tck/support/SubscriberBlackboxVerificationRules.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberBlackboxVerificationRules.java similarity index 99% rename from tck/src/main/java/org/reactivestreams/tck/support/SubscriberBlackboxVerificationRules.java rename to tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberBlackboxVerificationRules.java index 8e33497a..e240c780 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/SubscriberBlackboxVerificationRules.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberBlackboxVerificationRules.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; import org.reactivestreams.tck.SubscriberBlackboxVerification; diff --git a/tck/src/main/java/org/reactivestreams/tck/support/SubscriberBufferOverflowException.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberBufferOverflowException.java similarity index 96% rename from tck/src/main/java/org/reactivestreams/tck/support/SubscriberBufferOverflowException.java rename to tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberBufferOverflowException.java index ebbe0f0b..9cb085d8 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/SubscriberBufferOverflowException.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberBufferOverflowException.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; public final class SubscriberBufferOverflowException extends RuntimeException { public SubscriberBufferOverflowException() { diff --git a/tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberWhiteboxVerificationRules.java similarity index 98% rename from tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java rename to tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberWhiteboxVerificationRules.java index 3d812626..6c49c216 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberWhiteboxVerificationRules.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; /** * Internal TCK use only. diff --git a/tck/src/main/java/org/reactivestreams/tck/support/TestException.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/TestException.java similarity index 95% rename from tck/src/main/java/org/reactivestreams/tck/support/TestException.java rename to tck/src/main/java/org/reactivestreams/tck/flow/support/TestException.java index fb395111..17ac5cda 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/TestException.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/TestException.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; /** * Exception used by the TCK to signal failures. diff --git a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java index 76472120..45701483 100644 --- a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java @@ -15,7 +15,7 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.reactivestreams.tck.support.TCKVerificationSupport; +import org.reactivestreams.tck.flow.support.TCKVerificationSupport; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; diff --git a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java index 52270518..ee4dc9b6 100644 --- a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java @@ -14,8 +14,8 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.reactivestreams.tck.support.TCKVerificationSupport; -import org.reactivestreams.tck.support.TestException; +import org.reactivestreams.tck.flow.support.TCKVerificationSupport; +import org.reactivestreams.tck.flow.support.TestException; import org.testng.Assert; import org.testng.annotations.Test; diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java index 7d53f1db..9f5055bf 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java @@ -13,7 +13,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.reactivestreams.tck.support.TCKVerificationSupport; +import org.reactivestreams.tck.flow.support.TCKVerificationSupport; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java index 4a0b9c0c..bd0737d7 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java @@ -15,8 +15,8 @@ import org.reactivestreams.Subscription; import org.reactivestreams.tck.SubscriberWhiteboxVerification.SubscriberPuppet; import org.reactivestreams.tck.SubscriberWhiteboxVerification.WhiteboxSubscriberProbe; -import org.reactivestreams.tck.support.Function; -import org.reactivestreams.tck.support.TCKVerificationSupport; +import org.reactivestreams.tck.flow.support.Function; +import org.reactivestreams.tck.flow.support.TCKVerificationSupport; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; diff --git a/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java b/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java index f04e2f2b..88ba57e3 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java @@ -12,14 +12,11 @@ package org.reactivestreams.tck; import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.reactivestreams.tck.SubscriberBlackboxVerification; -import org.reactivestreams.tck.TestEnvironment; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import org.reactivestreams.tck.support.SyncTriggeredDemandSubscriber; +import org.reactivestreams.tck.flow.support.SyncTriggeredDemandSubscriber; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; diff --git a/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberWhiteboxTest.java b/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberWhiteboxTest.java index d14f4488..d4c03fc9 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberWhiteboxTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberWhiteboxTest.java @@ -13,14 +13,11 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.reactivestreams.tck.SubscriberBlackboxVerification; -import org.reactivestreams.tck.SubscriberWhiteboxVerification; -import org.reactivestreams.tck.TestEnvironment; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import org.reactivestreams.tck.support.SyncTriggeredDemandSubscriber; +import org.reactivestreams.tck.flow.support.SyncTriggeredDemandSubscriber; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; diff --git a/tck/src/test/java/org/reactivestreams/tck/support/SyncTriggeredDemandSubscriber.java b/tck/src/test/java/org/reactivestreams/tck/flow/support/SyncTriggeredDemandSubscriber.java similarity index 99% rename from tck/src/test/java/org/reactivestreams/tck/support/SyncTriggeredDemandSubscriber.java rename to tck/src/test/java/org/reactivestreams/tck/flow/support/SyncTriggeredDemandSubscriber.java index eb9fd7a3..0df6b3bf 100644 --- a/tck/src/test/java/org/reactivestreams/tck/support/SyncTriggeredDemandSubscriber.java +++ b/tck/src/test/java/org/reactivestreams/tck/flow/support/SyncTriggeredDemandSubscriber.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; diff --git a/tck/src/test/java/org/reactivestreams/tck/support/TCKVerificationSupport.java b/tck/src/test/java/org/reactivestreams/tck/flow/support/TCKVerificationSupport.java similarity index 99% rename from tck/src/test/java/org/reactivestreams/tck/support/TCKVerificationSupport.java rename to tck/src/test/java/org/reactivestreams/tck/flow/support/TCKVerificationSupport.java index b5067da7..5251d231 100644 --- a/tck/src/test/java/org/reactivestreams/tck/support/TCKVerificationSupport.java +++ b/tck/src/test/java/org/reactivestreams/tck/flow/support/TCKVerificationSupport.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.tck.support; +package org.reactivestreams.tck.flow.support; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; diff --git a/tck/src/test/resources/testng.yaml b/tck/src/test/resources/testng.yaml index 854a0ba0..cb773852 100644 --- a/tck/src/test/resources/testng.yaml +++ b/tck/src/test/resources/testng.yaml @@ -6,5 +6,6 @@ tests: classes: - org.reactivestreams.tck.IdentityProcessorVerificationDelegationTest - org.reactivestreams.tck.PublisherVerificationTest + - org.reactivestreams.tck.BokenExampleTest - org.reactivestreams.tck.SubscriberBlackboxVerificationTest - org.reactivestreams.tck.SubscriberWhiteboxVerificationTest