Skip to content

PublishSubject's HashMap makes groupBy and such non-deterministic. #282

Closed
@Treora

Description

@Treora

In a few cases already I noticed unexpected and non-deterministic behaviour occurring when subscribing (directly or via a chain of observables) to a PublishSubject source. This happens when subscribing from within an onNext method that is directly or indirectly triggered by that same source. The newly subscribed observer may or may not directly receive the onNext call from the same event. An example where this is annoying:

connectToEventStream()
    .publish()
    .groupBy(event -> event.source).subscribe(eventStream -> {
        LogFile log = new LogFile(eventStream.getKey() + ".log");
        eventStream.subscribe(event -> log.write(event.getMessage()));
    });

In this example the first event of each source might be skipped and not be logged, but other times it will work fine. To me this seems undesired behaviour. There may be cases where it actually is preferable that the current item will be skipped when subscribing from an onNext method, but this happening unpredictably is never a good idea.

The cause of the unpredictability is the implementation of Map that is used in PublishSubject, which when iterating on the map's values sometimes includes new items:

Similarly, Iterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration.

And supposedly whether an item will be iterated over or not depends on the hashes of the subscriptions, thus totally unpredictable. A sensible option would be to use a different implementation of Map that does iterate over any items that are added during the iterating loop, but it looks like this implementation then has to be written or found first as java seems not to provide anything like this.

As a quick hack I added this code to the PublishSubject, but I considered this solution too ugly to be worth a pull request (commit a7fc861):

public void onNext(T args) {
    int observersSize;
    Set<Observer<T>> notifiedObservers = new HashSet<Observer<T>>();
    do {
        observersSize = observers.size();
        for (Observer<T> observer : observers.values()) {
            if (!notifiedObservers.contains(observer)) {
                observer.onNext(args);
                notifiedObservers.add(observer);
            }
        }
    } while (observers.size() > observersSize);
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions