Skip to content

Thread BLOCKED issues of DestinationCache in DefaultSubscriptionRegistry #24395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
liheyuan opened this issue Jan 19, 2020 · 6 comments
Closed
Assignees
Labels
in: messaging Issues in messaging modules (jms, messaging) status: superseded An issue that has been superseded by another

Comments

@liheyuan
Copy link

liheyuan commented Jan 19, 2020

We are using the spring-messaging to implement a STOMP server (using SimpleBrokerMessageHandler).
The Client will subscribe on 5 channel and everything is ok when there are only a few users.
However, when the online user is above ~ 700, the websocket channel is "out of response".

After analysis, I found many other thread has been "BLOCKED" by DestinationCache, as follows:

"pk-ws-worker-100-thread-78" #560 prio=5 os_prio=0 tid=0x00007ff19c182000 nid=0x1252 waiting for monitor entry [0x00007ff0a8d9f000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry$DestinationCache.getSubscriptions(DefaultSubscriptionRegistry.java:269)
        - waiting to lock <0x00000004c007ec20> (a org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry$DestinationCache$1)

And part of the code are as follows:

public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) {
  LinkedMultiValueMap<String, String> result = this.accessCache.get(destination);
  if (result == null) {
    synchronized (this.updateCache) {
      result = new LinkedMultiValueMap<>();
      for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) {
        for (String destinationPattern : info.getDestinations()) {
          if (getPathMatcher().match(destinationPattern, destination)) {
            for (Subscription sub : info.getSubscriptions(destinationPattern)) {
              result.add(info.sessionId, sub.getId());
            }
          }
        }
      }
      if (!result.isEmpty()) {
        this.updateCache.put(destination, result.deepCopy());
        this.accessCache.put(destination, result);
      }
    }
  }
  return result;
}

As you can see, the code inside synchronized will traverse all subscription, which will cost too much time and block other Thread.

Also, the accessCache / updateCache is not works if the client has not success make the subscription, which will make the situation worse.

We try to increase the cache limit and it does't work for our case.

To solve the problem, we remove the DestinationCache and
reimplement an Map -> <sessionId -> subsId> inside SessionSubscriptionRegistry.
(in our own codebase of course)

After theses change, the server can handle more than 5K online users with no problem.

Meanwhile, I noticed that DefaultSubscriptionRegistry and DestinationCache has been there for many years.

So, I just wonder is it ok to make a pr?
Or the existing DestinationCache is good for some other reason?

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Jan 19, 2020
@rstoyanchev
Copy link
Contributor

Thanks for report. The use of DestinationCache could be abstracted and made possible to swap out with a different implementation. I can imagine for example an implementation based on parsed patterns.

As you can see, the code inside synchronized will traverse all subscription, which will cost too much time and block other Thread.

That said I am curious how is your implementation different?

@rstoyanchev rstoyanchev self-assigned this Jan 22, 2020
@rstoyanchev rstoyanchev added the status: waiting-for-feedback We need additional information before we can continue label Jan 22, 2020
@liheyuan
Copy link
Author

liheyuan commented Jan 29, 2020

@rstoyanchev
Thanks for reply.
My implementation is maintain a Map , with <key, value> as <sessionId -> subsId> inside SessionSubscriptionRegistry.
The map will update on every change of SessionSubscriptionRegistry.
Because this map is fully sync with SessionSubscriptionRegistry, we can use this map directly for non-fuzzy destination(destination without {}, *) , which is very fast.
For the fuzzy destination(destination with {} * ...), just use the way inside getSubscriptions in DestinationCache but with out query cache (Yes, query all subscription on query, which will turn it into a cpu intensive task without lock).
Also, the fuzzy destination is no more than 5% for the normal STOMP usage case.
So the performance can be boosted.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Jan 29, 2020
@trim09
Copy link

trim09 commented Mar 26, 2020

We observed the same. The CPU was BLOCKED on synchronized blocks in DefaultSubscriptionRegistry causing a bottleneck. We were lucky, that we had no subscription pattern matching so we was able to solve it by reimplemeting DefaultSubscriptionRegistry. We used two concurrentMaps and it's.computeX() methods:

public class CustomSubscriptionRegistry extends AbstractSubscriptionRegistry {

    private static final MultiValueMap<String, String> EMPTY_MAP = CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<>());

    // sessionId -> (subscriptionId -> destination)
    private final ConcurrentMap<String, Map<String, String>> sessions = new ConcurrentHashMap<>();

    // destination -> (session id -> List of subscriptionIds)
    private final ConcurrentMap<String, MultiValueMap<String, String>> destinationLookup = new ConcurrentHashMap<>();

    @Override
    protected void addSubscriptionInternal(@NonNull String sessionId, @NonNull String subscriptionId,
                                           @NonNull String destination, @NonNull Message<?> message) {
        sessions.compute(sessionId, (s, map) -> {
            if (map == null) {
                map = new HashMap<>();
            }
            map.put(subscriptionId, destination);
            addToDestinationLookup(sessionId, subscriptionId, destination);
            return map;
        });
    }

    @Override
    protected void removeSubscriptionInternal(@NonNull String sessionId, @NonNull String subscriptionId, @NonNull Message<?> message) {
        sessions.computeIfPresent(sessionId, (s, map) -> {
            String destination = map.remove(subscriptionId);

            if (destination != null) {
                removeFromDestinationLookup(sessionId, subscriptionId, destination);
            } else {
                log.trace("Could not remove websocket subscription. Subscription '{}' was not found for session '{}'",
                    subscriptionId, sessionId);
            }

            return emptyMapToNull(map);
        });
    }

    @Override
    public void unregisterAllSubscriptions(@NonNull String sessionId) {
        Map<String, String> map = sessions.remove(sessionId);

        if (map == null) {
            log.error("Could not unregister websocket session. Session '{}' was not found.", sessionId);
            return;
        }

        map.values().forEach(destination ->
            removeFromDestinationLookup(sessionId, destination));
    }

    @Override
    protected @NonNull MultiValueMap<String, String> findSubscriptionsInternal(@NonNull String destination, @NonNull Message<?> message) {
        return destinationLookup.getOrDefault(destination, EMPTY_MAP);
    }

    private void addToDestinationLookup(@NonNull String sessionId, @NonNull String subscriptionId, @NonNull String destination) {
        destinationLookup.compute(destination, (s, map) -> {
            if (map == null) {
                map = new LinkedMultiValueMap<>();
            }
            map.add(sessionId, subscriptionId);
            return map;
        });
    }

    private void removeFromDestinationLookup(@NonNull String sessionId, @NonNull String subscriptionId, String destination) {
        destinationLookup.computeIfPresent(destination, (dest, map) -> {
            map.computeIfPresent(sessionId, (s, subscriptions) -> {
                subscriptions.remove(subscriptionId);
                if (subscriptions.isEmpty()) {
                    return null;
                } else {
                    return subscriptions;
                }
            });

            return emptyMapToNull(map);
        });
    }

    private void removeFromDestinationLookup(@NonNull String sessionId, String destination) {
        destinationLookup.computeIfPresent(destination, (dest, map) -> {
            map.remove(sessionId);
            return emptyMapToNull(map);
        });
    }

    private <V, K, T extends Map<V, K>> T emptyMapToNull(T map) {
        return map.isEmpty() ? null : map;
    }
}

And registered it like this:

@Configuration
public class CustomBrokerMessageHandlerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration {

    @Override
    @Bean
    public AbstractBrokerMessageHandler simpleBrokerMessageHandler() {
        SimpleBrokerMessageHandler handler = (SimpleBrokerMessageHandler) super.simpleBrokerMessageHandler();

        if (handler != null) {
            handler.setSubscriptionRegistry(new CustomSubscriptionRegistry());
        }

        return handler;
    }
}

It more than doubled the performance of a simple broker.

@rstoyanchev
Copy link
Contributor

rstoyanchev commented Jul 2, 2020

@liheyuan, #25298 is now scheduled for 5.3 and should be a significant improvement. It not only avoids pattern matching for non-pattern destinations which was your main concern I believe but also reduces locking contention. If you are able to, once the PR is processed, it would be very helpful to hear if it works for you.

@rstoyanchev rstoyanchev added status: superseded An issue that has been superseded by another in: messaging Issues in messaging modules (jms, messaging) and removed status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged or decided on labels Jul 2, 2020
trim09 pushed a commit to trim09/spring-framework that referenced this issue Jul 8, 2020
* DestinationCache is now synchronized on multiple 'destination' locks
 (previously a single shared lock)
* DestinationCache keeps destinations without any subscriptions
 (previously such destinations were recomputed over and over)
* SessionSubscriptionRegistry is now a
 'sessionId -> subscriptionId -> (destination,selector)' map
 for faster lookups
 (previously 'sessionId -> destination -> set of (subscriptionId,selector)')

closes spring-projectsgh-24395
@trim09
Copy link

trim09 commented Jul 9, 2020

The part that I could optimize the most was cache recalculations for destinations without any subscribers on it.
E.g. if you are publishing messages to /topic/PRICE.STOCK.NASDAQ.IBM and no one it listening for such topic, then the server never populate a cache entry for this topic and recompute it with every message that you are publishing.
I think this simple (but hard to spot) fix could solve the issue.

rstoyanchev pushed a commit that referenced this issue Jul 16, 2020
* DestinationCache is now synchronized on multiple 'destination' locks
 (previously a single shared lock)
* DestinationCache keeps destinations without any subscriptions
 (previously such destinations were recomputed over and over)
* SessionSubscriptionRegistry is now a
 'sessionId -> subscriptionId -> (destination,selector)' map
 for faster lookups
 (previously 'sessionId -> destination -> set of (subscriptionId,selector)')

closes gh-24395
@alienisty
Copy link

Apparently you can have mutliple subscriptions with the same subscription-id and this change breaks that requirement. Specifically UserDestinationMessageHandler will reuse the same topic subscription message to create the session specific subscription to that topic, which means that according to which thread gets to insert the subscription first wins.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: messaging Issues in messaging modules (jms, messaging) status: superseded An issue that has been superseded by another
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants