Skip to content

Subscription to a behavior subject can miss the value of a parallel occuring on_next #512

@PKobold

Description

@PKobold

I encountered a strange behavior when using a behavior subject (no pun intended :D) to spread a state change. Sometimes the subscriber won't get a state change by an on_next call, although you get the value from before the call when you subscribe.

This then results in heavilly unwanted behavior, because in my example you could endlessly wait for a state change to a certain value, which you should always get by concept.

I tracked this down and came to the understanding that this can happen due to the implementation of behavior::get_observable() in rx-behavior.hpp

observable<T> get_observable() const {
        auto keepAlive = s;
        return make_observable_dynamic<T>([=](subscriber<T> o){
            if (keepAlive.get_subscription().is_subscribed()) {
                o.on_next(get_value());
            }
            keepAlive.add(s.get_subscriber(), std::move(o));
        });
    }

after get_value is called there is a gap until the subscriber is properly added where the state of the behavior_observer could be updated by a parallel occuring on_next. The new subscriber o then won't get the new value.

A quick fix that works for me and stays local would be, that you change the mutex of the behavior_observer_state into a recursive mutex and then make it properly accessible and lock it in the lamba function above, see the following pseudo sample implementation:

class behavior_observer : public detail::multicast_observer<T>
{...
   class behavior_observer_state
   { 
     mutable std::recursive_mutex lock;
     public:
     ....
     std::recursive_mutex &mutex() const
     {
       return lock;
     }
   };
....
  public:
....
      std::recursive_mutex& mutex() const
      {
        return state->mutex();
       }
 };

class behavior
{
 ...
public:
....
    observable<T> get_observable() const {
        auto keepAlive = s;
        return make_observable_dynamic<T>([=](subscriber<T> o)  
           std::lock_guard<std::recursive_mutex> guard(keepAlive.mutex());
            if (keepAlive.get_subscription().is_subscribed()) {
                o.on_next(get_value());
            }
            keepAlive.add(s.get_subscriber(), std::move(o));
        });
    }
};

But maybe some of you have a better solution for this.
If desired i can provide a little test example that can reproduce the problem.

Some side questions, while having a deeper look into the implementations in rx-behavior.hpp:

  1. Why using unique_lock instead of lock_guard in behavior_observer_state? Isn't lock_guard for this case to prefer in general because unique_lock is the complexer object, even though it may be optimized away that it wouldn't matter (but this is not ensured)?

  2. in the current implemantation of behavior::get_observable() the keepAlive object seems to be unnecessary redundant and this code, would also do the trick, or am i missing something? (compare with the first code section):

observable<T> get_observable() const
            {
                return make_observable_dynamic<T>([=](subscriber<T> o) {
                    if (s.get_subscription().is_subscribed())
                    {
                        o.on_next(get_value());
                    }
                    s.add(s.get_subscriber(), std::move(o));
                });
            }

BR
Paul

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions