-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Register TcpSenders on delegated TcpConnection with interceptor instance #3514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Fixes spring-projects#3509 Dead connections are not being removed on `TcpSender` when using a `TcpConnectionInterceptor` May cause a memory leak
# Conflicts: # spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java
...n-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java
Outdated
Show resolved
Hide resolved
@@ -101,7 +101,7 @@ public void registerSender(TcpSender sender) { | |||
|
|||
@Override | |||
public void registerSenders(List<TcpSender> sendersToRegister) { | |||
this.theConnection.registerSenders(sendersToRegister); | |||
this.theConnection.registerSenders(sendersToRegister, this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is indeed confusing that interceptor abstraction implements a connection one.
So, we have such a strange code flow...
I wish @garyrussell will come back to us one day to revise this architecture 😄
Nevertheless would you mind, @gigermocas , to double check what we have so far with the TcpConnectionSupport.close()
:
public void close() {
for (TcpSender sender : this.senders) {
sender.removeDeadConnection(this);
}
...
After this fix it doesn't look like we are going to clean up properly...
I'll debug it locally meanwhile, too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish @garyrussell will come back to us one day to revise this architecture 😄
Yes, this is a bit of a mess; dates back to very early code and has caused headaches since then.
Maybe in 6.0 @artembilan if you want to raise an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue created: #3515, but it doesn't mean we have to make it in 6.0
. Probably one day in the future when it causes more headaches 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch, I was too slow writing this, but these were my observations.
TcpConnectionSupport.close()
can be called multiple times and TcpConnectionInterceptorSupport.close()
will ultimately call it too. TcpConnectionSupport.closeConnection()
(through close()
) can call both again.
TcpConnectionSupport.close()
with also publish a close event, which must be done once.
Keeping the senders list in TcpConnectionSupport seemed like a better fit looking at the rest of the API (and parity with the other register methods)
I hoped it was cleaner, I don't particularly like it, but I couldn't find an alternative without worrying about breaking the API or polluting the interceptor by re-implementing some behavior from TcpConnectionSupport
😟
…eptedConnection()
Since you are already here, @garyrussell , WDYT about the fix? |
I am not really "here" 😄 I was just responding to your mention. But I will take a look when a get a few minutes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, we have to use the same object for both addNewConnection()
and removeDeadConnection()
.
The key to the connections maps is the connection id. While this is the same for the HelloWorldInterceptor
, that is not the case for other framework interceptors.
e.g.
Lines 388 to 391 in be2a698
@Override | |
public String getConnectionId() { | |
return this.connectionId + ":" + this.epoch; | |
} |
If I change the HelloWorldInterceptor
thus:
@Override
public String getConnectionId() {
return "somethingElse";
}
and add this to the new test...
assertThat(TestUtils.getPropertyValue(handler, "connections", Map.class)).isEmpty();
...it fails.
I see your point @garyrussell, but the only framework class that implements I guess I've only managed to reduce the problem scope to those users that override their interceptor's I'm not seeing a full fix for this without requiring some major rework, sorry for the mess guys 😞 |
I can't find any solution for this at the moment. |
We could add this hack to the 3 implementations of TcpSender, at least for today's release, to give us some time for a better solution. private static final Pattern FAILOVER_ID = Pattern.compile("-[a-f0-9]{12}:[0-9]+$");
private final Map<String, String> failoverConnections = new ConcurrentHashMap<>();
...
@Override
public void addNewConnection(TcpConnection connection) {
String id = connection.getConnectionId();
if (FAILOVER_ID.matcher(id).find()) {
this.failoverConnections.put(id.substring(0, id.lastIndexOf(':')), id);
}
this.connections.put(id, connection);
}
@Override
public void removeDeadConnection(TcpConnection connection) {
String id = connection.getConnectionId();
this.connections.remove(id);
String mapped = this.failoverConnections.remove(id);
if (mapped != null) {
this.connections.remove(mapped);
}
} I tested it by adding this to the Hello World interceptor @Override
public String getConnectionId() {
return super.getConnectionId() + ":1";
} |
Since the original PR was back-ported, we can't leave the regression in place. |
I thought about something like that, but based on the Gary, if you see some intermediate solution, please, go ahead for today's release. Thanks |
I mean it is OK to take this PR and commit some fix with back-port. |
Either that or we have to back out the memory leak commit (at least in 5.4.x), given that we are pressed for time. |
Or, defer 5.4.5 - we can live with the regression on master. |
OK. Reverting the change in the |
I think I might have a better fix for the original memory leak - testing now. |
Replaced by #3518 |
Follow up to #3510
Regrettably my previous pull request didn't address the whole underlying issue, although the memory leak was fixed, it introduced a regression as messages to the client were not being sent through the configured interceptor.
TcpConnectionSupport.registerSenders()
with aTcpConnection
instanceTcpConnectionInterceptorSupport.registerSenders()
to use introducedregisterSenders()
on delegated connection