-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Backpressure and gc #2001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
The This was originally thought to be the correct behavior for The issue that you'll need solved for backpressure to work here is: #1828 Backpressure: Window by Size |
I switched to using buffer and problem still happens (I assume buffer is backpressure enabled). Observable<String> filenames = ...
filename
// batch the filenames so we don't overload merge
.buffer(availableProcessors - 2)
.map(list -> Observable.from(list))
// get positions for each window
.concatMap(filenames->filenames.flatMap(
f -> positions(f).subscribeOn(Schedulers.computation()))
.subscribe(); |
I've tried these jvm args:
and problem is delayed slightly but still happens. Overall GC is slower than object creation so I run out of memory and operators are none the wiser in terms of slowing down requests. |
The So now we look for bugs if this is not actually working. This code seems to be functioning but I have not looked at memory yet. I want to confirm this is functioning correctly: import java.util.List;
import rx.Observable;
import rx.schedulers.Schedulers;
public class BufferBackpressure {
public static void main(String... args) {
Observable.range(0, 10000000)
.doOnRequest(r -> System.out.println("requested: " + r))
.doOnNext(i -> System.out.println("raw emission: " + i))
// batch the filenames so we don't overload merge
.buffer(Runtime.getRuntime().availableProcessors() - 2)
.map(list -> Observable.from(list))
// get positions for each window
.concatMap(filenames -> filenames.flatMap(f -> positions(f).subscribeOn(Schedulers.computation())))
.toBlocking().forEach(System.out::println);
}
private static Observable<String> positions(int i) {
return Observable.just(i).map(n -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
return "Processed => " + n;
});
}
} This emits the following which shows it is requesting the chunks correctly:
Is this what you see when running your version? The assumption here is that your |
I think backpressure is working fine it's just that GC is not keeping up with object creation. Note that no objects are retained by this stream so GC should be able to clean up virtually everything given the opportunity. This is the log with mem usage (scroll right to see), messagePerSecond since start (not useful from line to line) and GC logging. By the last line the process has hung.
|
Ah, my logging method (a custom |
Spoke too soon. Removed the logging calls and replaced with a simple doOnNext logging line and the problem still happened, just took longer. |
I think it's on me to knock up a load test for you people to run. Might be tomorrow AEST though as it's home time. |
Memory profile would be great. Otherwise, my tip is on concatMap piling up filenames. |
Hah, sorted it out. Twas a ring buffer size override that had been set to some enormous value as a test earlier. Sorry to have wasted your time, thanks for the responses. |
Glad you found it! That’s a fun one :-) |
Further to the discussion in #1941 I'm processing records at about 50,000 messages/s using code like this with java 1.7u72.
The positions are read from a file using backpressure enabled operators (no
onBackpressureBuffer
is in use) so everything goes nicely for a minute or two at which point a full GC is required. While the full GC is running the stream is still running full bore creating objects and consuming memory, so much so that by the time the full GC is finished, heap usage is so high that another GC is required and before you know it everything's hung due to GC not keeping up with object creation.Any suggestions for how to induce backpressure when GC not keeping up?
The text was updated successfully, but these errors were encountered: