-
Notifications
You must be signed in to change notification settings - Fork 7.6k
2.x: Add MulticastProcessor #6002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #6002 +/- ##
============================================
+ Coverage 98.25% 98.27% +0.01%
- Complexity 6062 6158 +96
============================================
Files 656 659 +3
Lines 44101 44514 +413
Branches 6118 6201 +83
============================================
+ Hits 43333 43745 +412
- Misses 230 231 +1
Partials 538 538
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somehow I missed this PR
* upstream when all {@link Subscriber}s have cancelled. Late {@code Subscriber}s will then be | ||
* immediately completed. | ||
* <p> | ||
* Because of {@code MulticastProcessor} implements the {@link Subscriber} interface, calling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd nuke the of
here
* {@code onSubscribe} is mandatory (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>). | ||
* If {@code MulticastProcessor} shoud run standalone, i.e., without subscribing the {@code MulticastProcessor} to another {@link Publisher}, | ||
* use {@link #start()} or {@link #startUnbounded()} methods to initialize the internal buffer. | ||
* Failing to do so will lead to {@link NullPointerException} at runtime. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lead to a
public static <T> MulticastProcessor<T> create() { | ||
return new MulticastProcessor<T>(bufferSize(), false); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double new line
* @param <T> the input and output value type | ||
* @return the new MulticastProcessor instance | ||
*/ | ||
public static <T> MulticastProcessor<T> create(int bufferSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add the @CheckReturnValue
annotation here
* is cancelled | ||
* @return the new MulticastProcessor instance | ||
*/ | ||
public static <T> MulticastProcessor<T> create(boolean refCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add the @CheckReturnValue
annotation here
* @param <T> the input and output value type | ||
* @return the new MulticastProcessor instance | ||
*/ | ||
public static <T> MulticastProcessor<T> create(int bufferSize, boolean refCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add the @CheckReturnValue
annotation here
} | ||
|
||
/** | ||
* Constructs a fresh instance with the given prefetch amount and the optional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no real need of having the documentation here, is there? It's not a public method and the create method already cover everything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may help those who dig into the source code for some reason (learning, debugging, etc.). I'll leave it there.
@Override | ||
public void onError(Throwable t) { | ||
if (t == null) { | ||
throw new NullPointerException("t is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't we also have another second sentence that was stating that nulls are not allowed per definition?
|
||
mp.test().assertResult(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double new line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, removed.
.assertEmpty() | ||
.requestMore(1) | ||
.assertValues(0) | ||
.assertNotComplete() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you just use assertValuesOnly
and then save the assertNotComplete
call?
This PR adds the
MulticastProcessor
from the extensions project to be a standard processor option.This type of processor fills the gap of having a backpressure-coordinating processor type as
PublishProcessor
doesn't coordinate backpressure on its own andFlowable.publish()
often can't be used because the upstream may not yet exist when the dowstream consumers are setup.Example:
Resolves: #5999