Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -615,12 +615,13 @@ protected TcpConnectionSupport wrapConnection(TcpConnectionSupport connectionArg
if (this.senders.size() == 0) {
connection.registerSender(wrapper);
}
connection.setWrapped(true);
connection = wrapper;
}
return connection;
}
finally {
this.addConnection(connection);
addConnection(connection);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@

/**
* @author Gary Russell
* @author Artem Bilan
* @since 2.0
*
*/
Expand All @@ -38,4 +39,8 @@ public void setInterceptors(TcpConnectionInterceptorFactory[] interceptorFactori
this.interceptorFactories = Arrays.copyOf(interceptorFactories, interceptorFactories.length);
}

public void setInterceptor(TcpConnectionInterceptorFactory... interceptorFactories) {
this.interceptorFactories = Arrays.copyOf(interceptorFactories, interceptorFactories.length);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2001-2020 the original author or authors.
* Copyright 2001-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,6 +96,8 @@ public abstract class TcpConnectionSupport implements TcpConnection {

private boolean manualListenerRegistration;

private boolean wrapped;

/*
* This boolean is to avoid looking for a temporary listener when not needed
* to avoid a CPU cache flush. This does not have to be volatile because it
Expand Down Expand Up @@ -164,8 +166,10 @@ void setTestFailed(boolean testFailed) {
*/
@Override
public void close() {
for (TcpSender sender : this.senders) {
sender.removeDeadConnection(this);
if (!this.wrapped) {
for (TcpSender sender : this.senders) {
sender.removeDeadConnection(this);
}
}
// close() may be called multiple times; only publish once
if (!this.closePublished.getAndSet(true)) {
Expand Down Expand Up @@ -194,6 +198,9 @@ protected void closeConnection(boolean isException) {
outerListener = nextListener;
}
outerListener.close();
for (TcpSender sender : getSenders()) {
sender.removeDeadConnection(outerListener);
}
if (isException) {
// ensure physical close in case the interceptor did not close
this.close();
Expand Down Expand Up @@ -264,6 +271,10 @@ public void setNeedsTest(boolean needsTest) {
this.needsTest = needsTest;
}

void setSenders(List<TcpSender> senders) {
this.senders.addAll(senders);
}

/**
* Set the listener that will receive incoming Messages.
* @param listener The listener.
Expand Down Expand Up @@ -401,6 +412,14 @@ public SocketInfo getSocketInfo() {
return this.socketInfo;
}

/**
* Set to true if intercepted.
* @param wrapped true if wrapped.
*/
public void setWrapped(boolean wrapped) {
this.wrapped = wrapped;
}

public String getConnectionFactoryName() {
return this.connectionFactoryName;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,7 +56,11 @@ protected TcpConnectionSupport buildNewConnection() {
TcpConnectionSupport connection =
this.tcpNetConnectionSupport.createNewConnection(socket, false, isLookupHost(),
getApplicationEventPublisher(), getComponentName());
connection = wrapConnection(connection);
TcpConnectionSupport wrapped = wrapConnection(connection);
if (wrapped.equals(connection)) {
connection.setSenders(getSenders());
connection = wrapped;
}
initializeConnection(connection, socket);
this.getTaskExecutor().execute(connection);
this.harvestClosedConnections();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -143,7 +143,11 @@ public void run() {
setSocketAttributes(socket);
TcpConnectionSupport connection = this.tcpNetConnectionSupport.createNewConnection(socket, true,
isLookupHost(), getApplicationEventPublisher(), getComponentName());
connection = wrapConnection(connection);
TcpConnectionSupport wrapped = wrapConnection(connection);
if (!wrapped.equals(connection)) {
connection.setSenders(getSenders());
connection = wrapped;
}
initializeConnection(connection, socket);
getTaskExecutor().execute(connection);
harvestClosedConnections();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,6 +98,9 @@ protected TcpConnectionSupport buildNewConnection() {
((TcpNioSSLConnection) connection).setHandshakeTimeout(sslHandshakeTimeout);
}
TcpConnectionSupport wrappedConnection = wrapConnection(connection);
if (!wrappedConnection.equals(connection)) {
connection.setSenders(getSenders());
}
initializeConnection(wrappedConnection, socketChannel.socket());
if (getSoTimeout() > 0) {
connection.setLastRead(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -277,6 +277,9 @@ private TcpNioConnection createTcpNioConnection(SocketChannel socketChannel) {
isLookupHost(), getApplicationEventPublisher(), getComponentName());
connection.setUsingDirectBuffers(this.usingDirectBuffers);
TcpConnectionSupport wrappedConnection = wrapConnection(connection);
if (!wrappedConnection.equals(connection)) {
connection.setSenders(getSenders());
}
initializeConnection(wrappedConnection, socketChannel.socket());
return connection;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2019 the original author or authors.
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,40 +16,34 @@

package org.springframework.integration.ip.tcp;

import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
import org.springframework.integration.ip.tcp.connection.HelloWorldInterceptorFactory;

/**
* @author Gary Russell
* @author Mário Dias
* @author Artem Bilan
*
* @since 3.0
*
*/
public class AbstractTcpChannelAdapterTests {

private static final ApplicationEventPublisher NOOP_PUBLISHER = new ApplicationEventPublisher() {

@Override
public void publishEvent(ApplicationEvent event) {
}

@Override
public void publishEvent(Object event) {

}

};
private static final ApplicationEventPublisher NOOP_PUBLISHER = event -> { };

protected HelloWorldInterceptorFactory newInterceptorFactory() {
return newInterceptorFactory(NOOP_PUBLISHER);
}

protected HelloWorldInterceptorFactory newInterceptorFactory(ApplicationEventPublisher applicationEventPublisher) {
HelloWorldInterceptorFactory factory = new HelloWorldInterceptorFactory();
factory.setApplicationEventPublisher(NOOP_PUBLISHER);
factory.setApplicationEventPublisher(applicationEventPublisher);
return factory;
}

protected void noopPublisher(AbstractConnectionFactory connectionFactory) {
connectionFactory.setApplicationEventPublisher(NOOP_PUBLISHER);
}


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
Expand All @@ -39,6 +40,7 @@
import java.util.concurrent.atomic.AtomicReference;

import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -57,15 +59,21 @@
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.HelloWorldInterceptor;
import org.springframework.integration.ip.tcp.connection.TcpConnection;
import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain;
import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory;
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
import org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer;
import org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer;
import org.springframework.integration.ip.util.TestingUtilities;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
Expand All @@ -77,14 +85,15 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Mário Dias
*
* @since 2.0
*/
public class TcpSendingMessageHandlerTests extends AbstractTcpChannelAdapterTests {

private static final Log logger = LogFactory.getLog(TcpSendingMessageHandlerTests.class);

private AsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
private final AsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();

private void readFully(InputStream is, byte[] buff) throws IOException {
for (int i = 0; i < buff.length; i++) {
Expand Down Expand Up @@ -1191,4 +1200,67 @@ public void testConnectionException() throws Exception {
}
}

@SuppressWarnings("unchecked")
@Test
public void testInterceptedConnection() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0);
ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer();
scf.setSerializer(serializer);
scf.setDeserializer(serializer);
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(scf);
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(scf);
final AtomicReference<TcpConnection> connection = new AtomicReference<>();
scf.setApplicationEventPublisher(event -> {
if (event instanceof TcpConnectionOpenEvent) {
connection.set(handler.getConnections()
.get(((TcpConnectionOpenEvent) event).getConnectionId()));
latch.countDown();
}
});
TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain();
fc.setInterceptor(newInterceptorFactory(scf.getApplicationEventPublisher()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work now if we override a getConnectionId() in the HelloWorldInterceptor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because the same object (the outermost interceptor) is always used for addNewConnection an removeDeadConnection.

scf.setInterceptorFactoryChain(fc);
scf.start();
TestingUtilities.waitListening(scf, null);
int port = scf.getPort();
Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
socket.close();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(connection.get()).isInstanceOf(HelloWorldInterceptor.class);
assertThat(TestUtils.getPropertyValue(handler, "connections", Map.class)).isEmpty();
scf.stop();
}

@Test
public void testInterceptedCleanup() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0);
ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer();
scf.setSerializer(serializer);
scf.setDeserializer(serializer);
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(scf);
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(scf);
scf.setApplicationEventPublisher(event -> {
if (event instanceof TcpConnectionCloseEvent) {
latch.countDown();
}
});
TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain();
fc.setInterceptor(newInterceptorFactory(scf.getApplicationEventPublisher()));
scf.setInterceptorFactoryChain(fc);
scf.start();
TestingUtilities.waitListening(scf, null);
int port = scf.getPort();
Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
socket.close();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(handler.getConnections().isEmpty()).isTrue();
scf.stop();
}

}