From 9b0a4b8c182bbe3f384d8ee8ede54eb754900bf5 Mon Sep 17 00:00:00 2001 From: Davidiusdadi Date: Tue, 3 Sep 2013 22:47:33 +0200 Subject: [PATCH 1/9] made websocket client use java.net instead of java.nio (addresses #88, #155, #167, #175, #177, #187) (changes not yet fully tested) --- src/main/example/ProxyClientExample.java | 22 +- src/main/example/SSLClientExample.java | 6 +- .../DefaultSSLWebSocketClientFactory.java | 52 --- .../client/DefaultWebSocketClientFactory.java | 39 --- .../client/WebSocketClient.java | 322 ++++++------------ 5 files changed, 120 insertions(+), 321 deletions(-) delete mode 100644 src/main/java/org/java_websocket/client/DefaultSSLWebSocketClientFactory.java delete mode 100644 src/main/java/org/java_websocket/client/DefaultWebSocketClientFactory.java diff --git a/src/main/example/ProxyClientExample.java b/src/main/example/ProxyClientExample.java index 133756fd0..ddff0ea0d 100644 --- a/src/main/example/ProxyClientExample.java +++ b/src/main/example/ProxyClientExample.java @@ -1,26 +1,12 @@ import java.net.InetSocketAddress; +import java.net.Proxy; import java.net.URI; import java.net.URISyntaxException; -import java.nio.channels.ByteChannel; - -public class ProxyClientExample extends ExampleClient { - - public ProxyClientExample( URI serverURI , InetSocketAddress proxy ) { - super( serverURI ); - setProxy( proxy ); - } - - @Override - public ByteChannel createProxyChannel( ByteChannel towrap ) { - /* - * You can create custom proxy handshake here. - * For more infos see: WebSocketClient.DefaultClientProxyChannel and http://tools.ietf.org/html/rfc6455#section-4.1 - */ - return super.createProxyChannel( towrap ); - } +public class ProxyClientExample { public static void main( String[] args ) throws URISyntaxException { - ProxyClientExample c = new ProxyClientExample( new URI( "ws://echo.websocket.org" ), new InetSocketAddress( "proxyaddress", 80 ) );// don't forget to change "proxyaddress" + ExampleClient c = new ExampleClient( new URI( "ws://echo.websocket.org" ) ); + c.setProxy( new Proxy( Proxy.Type.HTTP, new InetSocketAddress( "proxyaddress", 80 ) ) ); c.connect(); } } diff --git a/src/main/example/SSLClientExample.java b/src/main/example/SSLClientExample.java index 2231e5475..e740c9c54 100644 --- a/src/main/example/SSLClientExample.java +++ b/src/main/example/SSLClientExample.java @@ -7,10 +7,10 @@ import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManagerFactory; import org.java_websocket.WebSocketImpl; -import org.java_websocket.client.DefaultSSLWebSocketClientFactory; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; @@ -79,7 +79,9 @@ public static void main( String[] args ) throws Exception { sslContext.init( kmf.getKeyManagers(), tmf.getTrustManagers(), null ); // sslContext.init( null, null, null ); // will use java's default key and trust store which is sufficient unless you deal with self-signed certificates - chatclient.setWebSocketFactory( new DefaultSSLWebSocketClientFactory( sslContext ) ); + SSLSocketFactory factory = sslContext.getSocketFactory();// (SSLSocketFactory) SSLSocketFactory.getDefault(); + + chatclient.setSocket( factory.createSocket() ); chatclient.connectBlocking(); diff --git a/src/main/java/org/java_websocket/client/DefaultSSLWebSocketClientFactory.java b/src/main/java/org/java_websocket/client/DefaultSSLWebSocketClientFactory.java deleted file mode 100644 index 0646cea1a..000000000 --- a/src/main/java/org/java_websocket/client/DefaultSSLWebSocketClientFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.java_websocket.client; -import java.io.IOException; -import java.net.Socket; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.java_websocket.SSLSocketChannel2; -import org.java_websocket.WebSocketAdapter; -import org.java_websocket.WebSocketImpl; -import org.java_websocket.client.WebSocketClient.WebSocketClientFactory; -import org.java_websocket.drafts.Draft; - - -public class DefaultSSLWebSocketClientFactory implements WebSocketClientFactory { - protected SSLContext sslcontext; - protected ExecutorService exec; - - public DefaultSSLWebSocketClientFactory( SSLContext sslContext ) { - this( sslContext, Executors.newSingleThreadScheduledExecutor() ); - } - - public DefaultSSLWebSocketClientFactory( SSLContext sslContext , ExecutorService exec ) { - if( sslContext == null || exec == null ) - throw new IllegalArgumentException(); - this.sslcontext = sslContext; - this.exec = exec; - } - - @Override - public ByteChannel wrapChannel( SocketChannel channel, SelectionKey key, String host, int port ) throws IOException { - SSLEngine e = sslcontext.createSSLEngine( host, port ); - e.setUseClientMode( true ); - return new SSLSocketChannel2( channel, e, exec, key ); - } - - @Override - public WebSocketImpl createWebSocket( WebSocketAdapter a, Draft d, Socket c ) { - return new WebSocketImpl( a, d, c ); - } - - @Override - public WebSocketImpl createWebSocket( WebSocketAdapter a, List d, Socket s ) { - return new WebSocketImpl( a, d, s ); - } -} \ No newline at end of file diff --git a/src/main/java/org/java_websocket/client/DefaultWebSocketClientFactory.java b/src/main/java/org/java_websocket/client/DefaultWebSocketClientFactory.java deleted file mode 100644 index 005f8f038..000000000 --- a/src/main/java/org/java_websocket/client/DefaultWebSocketClientFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.java_websocket.client; - -import java.net.Socket; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.List; - -import org.java_websocket.WebSocket; -import org.java_websocket.WebSocketAdapter; -import org.java_websocket.WebSocketImpl; -import org.java_websocket.drafts.Draft; - -public class DefaultWebSocketClientFactory implements WebSocketClient.WebSocketClientFactory { - /** - * - */ - private final WebSocketClient webSocketClient; - /** - * @param webSocketClient - */ - public DefaultWebSocketClientFactory( WebSocketClient webSocketClient ) { - this.webSocketClient = webSocketClient; - } - @Override - public WebSocket createWebSocket( WebSocketAdapter a, Draft d, Socket s ) { - return new WebSocketImpl( this.webSocketClient, d ); - } - @Override - public WebSocket createWebSocket( WebSocketAdapter a, List d, Socket s ) { - return new WebSocketImpl( this.webSocketClient, d ); - } - @Override - public ByteChannel wrapChannel( SocketChannel channel, SelectionKey c, String host, int port ) { - if( c == null ) - return channel; - return channel; - } -} \ No newline at end of file diff --git a/src/main/java/org/java_websocket/client/WebSocketClient.java b/src/main/java/org/java_websocket/client/WebSocketClient.java index 1aeb64688..635cb17f8 100644 --- a/src/main/java/org/java_websocket/client/WebSocketClient.java +++ b/src/main/java/org/java_websocket/client/WebSocketClient.java @@ -1,27 +1,22 @@ package org.java_websocket.client; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.Socket; import java.net.URI; import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedByInterruptException; import java.nio.channels.NotYetConnectedException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.channels.spi.SelectorProvider; import java.util.Map; import java.util.concurrent.CountDownLatch; -import org.java_websocket.SocketChannelIOHelper; import org.java_websocket.WebSocket; import org.java_websocket.WebSocketAdapter; -import org.java_websocket.WebSocketFactory; import org.java_websocket.WebSocketImpl; -import org.java_websocket.WrappedByteChannel; import org.java_websocket.drafts.Draft; -import org.java_websocket.drafts.Draft_10; +import org.java_websocket.drafts.Draft_17; import org.java_websocket.exceptions.InvalidHandshakeException; import org.java_websocket.framing.CloseFrame; import org.java_websocket.framing.Framedata; @@ -31,14 +26,8 @@ import org.java_websocket.handshake.ServerHandshake; /** - * The WebSocketClient is an abstract class that expects a valid - * "ws://" URI to connect to. When connected, an instance recieves important - * events related to the life of the connection. A subclass must implement - * onOpen, onClose, and onMessage to be - * useful. An instance can send messages to it's connected server via the - * send method. - * - * @author Nathan Rajlich + * A subclass must implement at least onOpen, onClose, and onMessage to be + * useful. At runtime the user is expected to establish a connection via {@link #connect()}, then receive events like {@link #onMessage(String)} via the overloaded methods and to {@link #send(String)} data to the server. */ public abstract class WebSocketClient extends WebSocketAdapter implements Runnable, WebSocket { @@ -47,17 +36,17 @@ public abstract class WebSocketClient extends WebSocketAdapter implements Runnab */ protected URI uri = null; - private WebSocketImpl conn = null; - /** - * The SocketChannel instance this channel uses. - */ - private SocketChannel channel = null; + private WebSocketImpl engine = null; + + private Socket socket = null; + + private InputStream istream; - private ByteChannel wrappedchannel = null; + private OutputStream ostream; - private Thread writethread; + private Proxy proxy = Proxy.NO_PROXY; - private Thread readthread; + private Thread writeThread; private Draft draft; @@ -67,92 +56,77 @@ public abstract class WebSocketClient extends WebSocketAdapter implements Runnab private CountDownLatch closeLatch = new CountDownLatch( 1 ); - private int timeout = 0; - - private WebSocketClientFactory wsfactory = new DefaultWebSocketClientFactory( this ); - - private InetSocketAddress proxyAddress = null; + private int connectTimeout = 0; + /** This open a websocket connection as specified by rfc6455 */ public WebSocketClient( URI serverURI ) { - this( serverURI, new Draft_10() ); + this( serverURI, new Draft_17() ); } /** * Constructs a WebSocketClient instance and sets it to the connect to the - * specified URI. The channel does not attampt to connect automatically. You - * must call connect first to initiate the socket connection. + * specified URI. The channel does not attampt to connect automatically. The connection + * will be established once you call connect. */ public WebSocketClient( URI serverUri , Draft draft ) { this( serverUri, draft, null, 0 ); } - public WebSocketClient( URI serverUri , Draft draft , Map headers , int connecttimeout ) { + public WebSocketClient( URI serverUri , Draft protocolDraft , Map httpHeaders , int connectTimeout ) { if( serverUri == null ) { throw new IllegalArgumentException(); - } - if( draft == null ) { + } else if( protocolDraft == null ) { throw new IllegalArgumentException( "null as draft is permitted for `WebSocketServer` only!" ); } this.uri = serverUri; - this.draft = draft; - this.headers = headers; - this.timeout = connecttimeout; - - try { - channel = SelectorProvider.provider().openSocketChannel(); - channel.configureBlocking( true ); - } catch ( IOException e ) { - channel = null; - onWebsocketError( null, e ); - } - if( channel == null ) { - conn = (WebSocketImpl) wsfactory.createWebSocket( this, draft, null ); - conn.close( CloseFrame.NEVER_CONNECTED, "Failed to create or configure SocketChannel." ); - } else { - conn = (WebSocketImpl) wsfactory.createWebSocket( this, draft, channel.socket() ); - } - + this.draft = protocolDraft; + this.headers = httpHeaders; + this.connectTimeout = connectTimeout; + this.engine = new WebSocketImpl( this, protocolDraft ); } /** - * Gets the URI that this WebSocketClient is connected to. - * - * @return The URI for this WebSocketClient. + * Returns the URI that this WebSocketClient is connected to. */ public URI getURI() { return uri; } - /** Returns the protocol version this channel uses. */ + /** + * Returns the protocol version this channel uses.
+ * For more infos see https://github.com/TooTallNate/Java-WebSocket/wiki/Drafts + */ public Draft getDraft() { return draft; } /** - * Starts a background thread that attempts and maintains a WebSocket - * connection to the URI specified in the constructor or via setURI. - * setURI. + * Initiates the websocket connection. This method does not block. */ public void connect() { - if( writethread != null ) + if( writeThread != null ) throw new IllegalStateException( "WebSocketClient objects are not reuseable" ); - writethread = new Thread( this ); - writethread.start(); + writeThread = new Thread( this ); + writeThread.start(); } /** - * Same as connect but blocks until the websocket connected or failed to do so.
+ * Same as connect but blocks until the websocket connected or failed to do so.
* Returns whether it succeeded or not. **/ public boolean connectBlocking() throws InterruptedException { connect(); connectLatch.await(); - return conn.isOpen(); + return engine.isOpen(); } + /** + * Initiates the websocket close handshake. This method does not block
+ * In oder to make sure the connection is closed use closeBlocking + */ public void close() { - if( writethread != null ) { - conn.close( CloseFrame.NORMAL ); + if( writeThread != null ) { + engine.close( CloseFrame.NORMAL ); } } @@ -162,102 +136,64 @@ public void closeBlocking() throws InterruptedException { } /** - * Sends text to the connected WebSocket server. + * Sends text to the connected websocket server. * * @param text - * The String to send to the WebSocket server. + * The string which will be transmitted. */ public void send( String text ) throws NotYetConnectedException { - conn.send( text ); - } - - public void sendFragment( Framedata f ) { - conn.sendFrame( f ); + engine.send( text ); } /** - * Sends data to the connected WebSocket server. + * Sends binary data to the connected webSocket server. * * @param data - * The Byte-Array of data to send to the WebSocket server. + * The byte-Array of data to send to the WebSocket server. */ public void send( byte[] data ) throws NotYetConnectedException { - conn.send( data ); + engine.send( data ); } - // Runnable IMPLEMENTATION ///////////////////////////////////////////////// public void run() { - if( writethread == null ) - writethread = Thread.currentThread(); - interruptableRun(); - - assert ( !channel.isOpen() ); - - } - - private final void interruptableRun() { - if( channel == null ) { - return;// channel will be initialized in the constructor and only be null if no socket channel could be created or if blocking mode could be established - } - try { - String host; - int port; - - if( proxyAddress != null ) { - host = proxyAddress.getHostName(); - port = proxyAddress.getPort(); - } else { - host = uri.getHost(); - port = getPort(); + if( socket == null ) { + socket = new Socket( proxy ); + } else if( socket.isClosed() ) { + throw new IOException(); } - channel.connect( new InetSocketAddress( host, port ) ); - conn.channel = wrappedchannel = createProxyChannel( wsfactory.wrapChannel( channel, null, host, port ) ); + if( !socket.isBound() ) + socket.connect( new InetSocketAddress( uri.getHost(), getPort() ), connectTimeout ); + istream = socket.getInputStream(); + ostream = socket.getOutputStream(); - timeout = 0; // since connect is over sendHandshake(); - readthread = new Thread( new WebsocketWriteThread() ); - readthread.start(); - } catch ( ClosedByInterruptException e ) { - onWebsocketError( null, e ); - return; - } catch ( /*IOException | SecurityException | UnresolvedAddressException*/Exception e ) {// - onWebsocketError( conn, e ); - conn.closeConnection( CloseFrame.NEVER_CONNECTED, e.getMessage() ); + } catch ( /*IOException | SecurityException | UnresolvedAddressException | InvalidHandshakeException | ClosedByInterruptException | SocketTimeoutException */Exception e ) { + onWebsocketError( engine, e ); + engine.closeConnection( CloseFrame.NEVER_CONNECTED, e.getMessage() ); return; } - ByteBuffer buff = ByteBuffer.allocate( WebSocketImpl.RCVBUF ); - try/*IO*/{ - while ( channel.isOpen() ) { - if( SocketChannelIOHelper.read( buff, this.conn, wrappedchannel ) ) { - conn.decode( buff ); - } else { - conn.eot(); - } + writeThread = new Thread( new WebsocketWriteThread() ); + writeThread.start(); - if( wrappedchannel instanceof WrappedByteChannel ) { - WrappedByteChannel w = (WrappedByteChannel) wrappedchannel; - if( w.isNeedRead() ) { - while ( SocketChannelIOHelper.readMore( buff, conn, w ) ) { - conn.decode( buff ); - } - conn.decode( buff ); - } - } - } + byte[] rawbuffer = new byte[ WebSocketImpl.RCVBUF ]; + int readBytes; - } catch ( CancelledKeyException e ) { - conn.eot(); + try { + while ( ( readBytes = istream.read( rawbuffer ) ) != -1 ) { + engine.decode( ByteBuffer.wrap( rawbuffer, 0, readBytes ) ); + } + engine.eot(); } catch ( IOException e ) { - conn.eot(); + engine.eot(); } catch ( RuntimeException e ) { // this catch case covers internal errors only and indicates a bug in this websocket implementation onError( e ); - conn.closeConnection( CloseFrame.ABNORMAL_CLOSE, e.getMessage() ); + engine.closeConnection( CloseFrame.ABNORMAL_CLOSE, e.getMessage() ); } + assert ( socket.isClosed() ); } - private int getPort() { int port = uri.getPort(); if( port == -1 ) { @@ -294,22 +230,18 @@ private void sendHandshake() throws InvalidHandshakeException { handshake.put( kv.getKey(), kv.getValue() ); } } - conn.startHandshake( handshake ); + engine.startHandshake( handshake ); } /** * This represents the state of the connection. - * You can use this method instead of */ public READYSTATE getReadyState() { - return conn.getReadyState(); + return engine.getReadyState(); } /** * Calls subclass' implementation of onMessage. - * - * @param conn - * @param message */ @Override public final void onWebsocketMessage( WebSocket conn, String message ) { @@ -328,8 +260,6 @@ public void onWebsocketMessageFragment( WebSocket conn, Framedata frame ) { /** * Calls subclass' implementation of onOpen. - * - * @param conn */ @Override public final void onWebsocketOpen( WebSocket conn, Handshakedata handshake ) { @@ -339,22 +269,18 @@ public final void onWebsocketOpen( WebSocket conn, Handshakedata handshake ) { /** * Calls subclass' implementation of onClose. - * - * @param conn */ @Override public final void onWebsocketClose( WebSocket conn, int code, String reason, boolean remote ) { connectLatch.countDown(); closeLatch.countDown(); - if( readthread != null ) - readthread.interrupt(); + if( writeThread != null ) + writeThread.interrupt(); onClose( code, reason, remote ); } /** * Calls subclass' implementation of onIOError. - * - * @param conn */ @Override public final void onWebsocketError( WebSocket conn, Exception ex ) { @@ -383,28 +309,20 @@ public void onClosing( int code, String reason, boolean remote ) { } public WebSocket getConnection() { - return conn; - } - - public final void setWebSocketFactory( WebSocketClientFactory wsf ) { - this.wsfactory = wsf; - } - - public final WebSocketFactory getWebSocketFactory() { - return wsfactory; + return engine; } @Override public InetSocketAddress getLocalSocketAddress( WebSocket conn ) { - if( channel != null ) - return (InetSocketAddress) channel.socket().getLocalSocketAddress(); + if( socket != null ) + return (InetSocketAddress) socket.getLocalSocketAddress(); return null; } @Override public InetSocketAddress getRemoteSocketAddress( WebSocket conn ) { - if( channel != null ) - return (InetSocketAddress) channel.socket().getLocalSocketAddress(); + if( socket != null ) + return (InetSocketAddress) socket.getLocalSocketAddress(); return null; } @@ -418,123 +336,107 @@ public void onMessage( ByteBuffer bytes ) { public void onFragment( Framedata frame ) { } - public class DefaultClientProxyChannel extends AbstractClientProxyChannel { - public DefaultClientProxyChannel( ByteChannel towrap ) { - super( towrap ); - } - @Override - public String buildHandShake() { - StringBuilder b = new StringBuilder(); - String host = uri.getHost(); - b.append( "CONNECT " ); - b.append( host ); - b.append( ":" ); - b.append( getPort() ); - b.append( " HTTP/1.1\n" ); - b.append( "Host: " ); - b.append( host ); - b.append( "\n" ); - return b.toString(); - } - } - - public interface WebSocketClientFactory extends WebSocketFactory { - public ByteChannel wrapChannel( SocketChannel channel, SelectionKey key, String host, int port ) throws IOException; - } - private class WebsocketWriteThread implements Runnable { @Override public void run() { Thread.currentThread().setName( "WebsocketWriteThread" ); try { while ( !Thread.interrupted() ) { - SocketChannelIOHelper.writeBlocking( conn, wrappedchannel ); + ByteBuffer buffer = engine.outQueue.take(); + ostream.write( buffer.array(), 0, buffer.limit() ); } } catch ( IOException e ) { - conn.eot(); + engine.eot(); } catch ( InterruptedException e ) { // this thread is regularly terminated via an interrupt } } } - public ByteChannel createProxyChannel( ByteChannel towrap ) { - if( proxyAddress != null ) { - return new DefaultClientProxyChannel( towrap ); - } - return towrap;// no proxy in use + public void setProxy( Proxy proxy ) { + if( proxy == null ) + throw new IllegalArgumentException(); + this.proxy = proxy; } - public void setProxy( InetSocketAddress proxyaddress ) { - proxyAddress = proxyaddress; + /** + * Accepts bound and unbound sockets.
+ * This method must be called before connect. + * If the given socket is not yet bound it will be bound to the uri specified in the constructor. + **/ + public void setSocket( Socket socket ) { + if( this.socket != null ) { + throw new IllegalStateException( "socket has already been set" ); + } + this.socket = socket; } @Override public void sendFragmentedFrame( Opcode op, ByteBuffer buffer, boolean fin ) { - conn.sendFragmentedFrame( op, buffer, fin ); + engine.sendFragmentedFrame( op, buffer, fin ); } @Override public boolean isOpen() { - return conn.isOpen(); + return engine.isOpen(); } @Override public boolean isFlushAndClose() { - return conn.isFlushAndClose(); + return engine.isFlushAndClose(); } @Override public boolean isClosed() { - return conn.isClosed(); + return engine.isClosed(); } @Override public boolean isClosing() { - return conn.isClosing(); + return engine.isClosing(); } @Override public boolean isConnecting() { - return conn.isConnecting(); + return engine.isConnecting(); } @Override public boolean hasBufferedData() { - return conn.hasBufferedData(); + return engine.hasBufferedData(); } @Override public void close( int code ) { - conn.close(); + engine.close(); } @Override public void close( int code, String message ) { - conn.close( code, message ); + engine.close( code, message ); } @Override public void closeConnection( int code, String message ) { - conn.closeConnection( code, message ); + engine.closeConnection( code, message ); } @Override public void send( ByteBuffer bytes ) throws IllegalArgumentException , NotYetConnectedException { - conn.send( bytes ); + engine.send( bytes ); } @Override public void sendFrame( Framedata framedata ) { - conn.sendFrame( framedata ); + engine.sendFrame( framedata ); } @Override public InetSocketAddress getLocalSocketAddress() { - return conn.getLocalSocketAddress(); + return engine.getLocalSocketAddress(); } @Override public InetSocketAddress getRemoteSocketAddress() { - return conn.getRemoteSocketAddress(); + return engine.getRemoteSocketAddress(); } } From e927c29eab226e6accd7da602f9105b29aff671f Mon Sep 17 00:00:00 2001 From: Davidiusdadi Date: Wed, 4 Sep 2013 20:39:30 +0200 Subject: [PATCH 2/9] prevented unnecessary attempts to decode of empty buffers --- .../server/WebSocketServer.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/java_websocket/server/WebSocketServer.java b/src/main/java/org/java_websocket/server/WebSocketServer.java index 6257d7d81..cd8bfc913 100644 --- a/src/main/java/org/java_websocket/server/WebSocketServer.java +++ b/src/main/java/org/java_websocket/server/WebSocketServer.java @@ -326,16 +326,18 @@ public void run() { conn = (WebSocketImpl) key.attachment(); ByteBuffer buf = takeBuffer(); try { - if( SocketChannelIOHelper.read( buf, conn, (ByteChannel) conn.channel ) ) { - assert ( buf.hasRemaining() ); - conn.inQueue.put( buf ); - queue( conn ); - i.remove(); - if( conn.channel instanceof WrappedByteChannel ) { - if( ( (WrappedByteChannel) conn.channel ).isNeedRead() ) { - iqueue.add( conn ); + if( SocketChannelIOHelper.read( buf, conn, conn.channel ) ) { + if( buf.hasRemaining() ) { + conn.inQueue.put( buf ); + queue( conn ); + i.remove(); + if( conn.channel instanceof WrappedByteChannel ) { + if( ( (WrappedByteChannel) conn.channel ).isNeedRead() ) { + iqueue.add( conn ); + } } - } + } else + pushBuffer( buf ); } else { pushBuffer( buf ); } @@ -346,7 +348,7 @@ public void run() { } if( key.isWritable() ) { conn = (WebSocketImpl) key.attachment(); - if( SocketChannelIOHelper.batch( conn, (ByteChannel) conn.channel ) ) { + if( SocketChannelIOHelper.batch( conn, conn.channel ) ) { if( key.isValid() ) key.interestOps( SelectionKey.OP_READ ); } @@ -359,9 +361,12 @@ public void run() { try { if( SocketChannelIOHelper.readMore( buf, conn, c ) ) iqueue.add( conn ); - assert ( buf.hasRemaining() ); - conn.inQueue.put( buf ); - queue( conn ); + if( buf.hasRemaining() ) { + conn.inQueue.put( buf ); + queue( conn ); + } else { + pushBuffer( buf ); + } } catch ( IOException e ) { pushBuffer( buf ); throw e; From 2b0bb8d0e85430a91ae2d6f438e83a9c995ddd9e Mon Sep 17 00:00:00 2001 From: Davidiusdadi Date: Wed, 4 Sep 2013 21:17:01 +0200 Subject: [PATCH 3/9] fixed deadlock in server( #195 ) thanks @DeHecht for isolating the problem --- .../server/WebSocketServer.java | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/java_websocket/server/WebSocketServer.java b/src/main/java/org/java_websocket/server/WebSocketServer.java index cd8bfc913..38adb067e 100644 --- a/src/main/java/org/java_websocket/server/WebSocketServer.java +++ b/src/main/java/org/java_websocket/server/WebSocketServer.java @@ -35,10 +35,12 @@ import org.java_websocket.WebSocketImpl; import org.java_websocket.WrappedByteChannel; import org.java_websocket.drafts.Draft; +import org.java_websocket.exceptions.InvalidDataException; import org.java_websocket.framing.CloseFrame; import org.java_websocket.framing.Framedata; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.handshake.Handshakedata; +import org.java_websocket.handshake.ServerHandshakeBuilder; /** * WebSocketServer is an abstract class that only takes care of the @@ -200,15 +202,21 @@ public void start() { * @throws InterruptedException */ public void stop( int timeout ) throws IOException , InterruptedException { - if( !isclosed.compareAndSet( false, true ) ) { + if( !isclosed.compareAndSet( false, true ) ) { // this also makes sure that no further connections will be added to this.connections return; } + List socketsToClose = null; + + // copy the connections in a list (prevent callback deadlocks) synchronized ( connections ) { - for( WebSocket ws : connections ) { - ws.close( CloseFrame.GOING_AWAY ); - } + socketsToClose = new ArrayList( connections ); + } + + for( WebSocket ws : socketsToClose ) { + ws.close( CloseFrame.GOING_AWAY ); } + synchronized ( this ) { if( selectorthread != null ) { if( Thread.currentThread() != selectorthread ) { @@ -525,13 +533,25 @@ protected boolean removeConnection( WebSocket ws ) { } } + @Override + public ServerHandshakeBuilder onWebsocketHandshakeReceivedAsServer( WebSocket conn, Draft draft, ClientHandshake request ) throws InvalidDataException { + return super.onWebsocketHandshakeReceivedAsServer( conn, draft, request ); + } + /** @see #removeConnection(WebSocket) */ protected boolean addConnection( WebSocket ws ) { - synchronized ( connections ) { - return this.connections.add( ws ); + if( isclosed.get() ) { + synchronized ( connections ) { + boolean succ = this.connections.add( ws ); + assert ( succ ); + return succ; + } + } else { + // This case will happen when a new connection gets ready while the server is already stopping. + ws.close( CloseFrame.GOING_AWAY ); + return true;// for consistency sake we will make sure that both onOpen will be called } } - /** * @param conn * may be null if the error does not belong to a single connection From dbad4a276d6d8e1537b58f7c0256e24765db1b3e Mon Sep 17 00:00:00 2001 From: Davidiusdadi Date: Wed, 4 Sep 2013 23:12:26 +0200 Subject: [PATCH 4/9] tweaked WebsocketServer.stop to perform a graceful shutdown --- .../server/WebSocketServer.java | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/java_websocket/server/WebSocketServer.java b/src/main/java/org/java_websocket/server/WebSocketServer.java index 38adb067e..a45f7e133 100644 --- a/src/main/java/org/java_websocket/server/WebSocketServer.java +++ b/src/main/java/org/java_websocket/server/WebSocketServer.java @@ -195,13 +195,13 @@ public void start() { * If this method is called before the server is started it will never start. * * @param timeout - * Specifies how many milliseconds shall pass between initiating the close handshakes with the connected clients and closing the servers socket channel. + * Specifies how many milliseconds the overall close handshaking may take altogether before the connections are closed without proper close handshaking.
* * @throws IOException * When {@link ServerSocketChannel}.close throws an IOException * @throws InterruptedException */ - public void stop( int timeout ) throws IOException , InterruptedException { + public void stop( int timeout ) throws InterruptedException { if( !isclosed.compareAndSet( false, true ) ) { // this also makes sure that no further connections will be added to this.connections return; } @@ -223,21 +223,14 @@ public void stop( int timeout ) throws IOException , InterruptedException { } if( selectorthread != Thread.currentThread() ) { - selectorthread.interrupt(); + if( socketsToClose.size() > 0 ) + selectorthread.join( timeout );// isclosed will tell the selectorthread to go down after the last connection was closed + selectorthread.interrupt();// in case the selectorthread did not terminate in time we send the interrupt selectorthread.join(); } } - if( decoders != null ) { - for( WebSocketWorker w : decoders ) { - w.interrupt(); - } - } - if( server != null ) { - server.close(); - } } } - public void stop() throws IOException , InterruptedException { stop( 0 ); } @@ -397,6 +390,19 @@ public void run() { } catch ( RuntimeException e ) { // should hopefully never occur handleFatal( null, e ); + } finally { + if( decoders != null ) { + for( WebSocketWorker w : decoders ) { + w.interrupt(); + } + } + if( server != null ) { + try { + server.close(); + } catch ( IOException e ) { + onError( null, e ); + } + } } } protected void allocateBuffers( WebSocket c ) throws InterruptedException { @@ -528,11 +534,16 @@ public final void onWebsocketClose( WebSocket conn, int code, String reason, boo * Depending on the type on the connection, modifications of that collection may have to be synchronized. **/ protected boolean removeConnection( WebSocket ws ) { + boolean removed; synchronized ( connections ) { - return this.connections.remove( ws ); + removed = this.connections.remove( ws ); + assert ( removed ); } + if( isclosed.get() && connections.size() == 0 ) { + selectorthread.interrupt(); + } + return removed; } - @Override public ServerHandshakeBuilder onWebsocketHandshakeReceivedAsServer( WebSocket conn, Draft draft, ClientHandshake request ) throws InvalidDataException { return super.onWebsocketHandshakeReceivedAsServer( conn, draft, request ); @@ -540,7 +551,7 @@ public ServerHandshakeBuilder onWebsocketHandshakeReceivedAsServer( WebSocket co /** @see #removeConnection(WebSocket) */ protected boolean addConnection( WebSocket ws ) { - if( isclosed.get() ) { + if( !isclosed.get() ) { synchronized ( connections ) { boolean succ = this.connections.add( ws ); assert ( succ ); From f1cf98491e378b6ecdfb92bb37e3cd43f57dbfa3 Mon Sep 17 00:00:00 2001 From: Davidiusdadi Date: Wed, 4 Sep 2013 23:17:10 +0200 Subject: [PATCH 5/9] -made socket output stream gets flushed after every frame --- src/main/java/org/java_websocket/client/WebSocketClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/java_websocket/client/WebSocketClient.java b/src/main/java/org/java_websocket/client/WebSocketClient.java index 635cb17f8..b21225f0a 100644 --- a/src/main/java/org/java_websocket/client/WebSocketClient.java +++ b/src/main/java/org/java_websocket/client/WebSocketClient.java @@ -344,6 +344,7 @@ public void run() { while ( !Thread.interrupted() ) { ByteBuffer buffer = engine.outQueue.take(); ostream.write( buffer.array(), 0, buffer.limit() ); + ostream.flush(); } } catch ( IOException e ) { engine.eot(); From de9955afa8e68be7f2f265bfea86178f8c19b1e8 Mon Sep 17 00:00:00 2001 From: Davidiusdadi Date: Wed, 4 Sep 2013 23:21:40 +0200 Subject: [PATCH 6/9] -made close frames require a close frame in response before closing the underlying socket --- .../org/java_websocket/SocketChannelIOHelper.java | 4 ++-- src/main/java/org/java_websocket/WebSocketImpl.java | 12 ++---------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/java_websocket/SocketChannelIOHelper.java b/src/main/java/org/java_websocket/SocketChannelIOHelper.java index 9752098c4..379aaf014 100644 --- a/src/main/java/org/java_websocket/SocketChannelIOHelper.java +++ b/src/main/java/org/java_websocket/SocketChannelIOHelper.java @@ -59,11 +59,11 @@ public static boolean batch( WebSocketImpl ws, ByteChannel sockchannel ) throws } while ( buffer != null ); } - if( ws.outQueue.isEmpty() && ws.isFlushAndClose() /*&& ( c == null || c.isNeedWrite() )*/) { + /*if( ws.outQueue.isEmpty() && ws.isFlushAndClose() ) {// synchronized ( ws ) { ws.closeConnection(); } - } + }*/ return c != null ? !( (WrappedByteChannel) sockchannel ).isNeedWrite() : true; } diff --git a/src/main/java/org/java_websocket/WebSocketImpl.java b/src/main/java/org/java_websocket/WebSocketImpl.java index 7bb7e8891..1780627a3 100644 --- a/src/main/java/org/java_websocket/WebSocketImpl.java +++ b/src/main/java/org/java_websocket/WebSocketImpl.java @@ -149,15 +149,11 @@ public WebSocketImpl( WebSocketListener listener , List drafts , Socket s public void decode( ByteBuffer socketBuffer ) { assert ( socketBuffer.hasRemaining() ); - if( flushandclosestate ) { - return; - } - if( DEBUG ) System.out.println( "process(" + socketBuffer.remaining() + "): {" + ( socketBuffer.remaining() > 1000 ? "too big to display" : new String( socketBuffer.array(), socketBuffer.position(), socketBuffer.remaining() ) ) + "}" ); - if( readystate == READYSTATE.OPEN ) { - decodeFrames( socketBuffer ); + if( readystate != READYSTATE.NOT_YET_CONNECTED ) { + decodeFrames( socketBuffer );; } else { if( decodeHandshake( socketBuffer ) ) { assert ( tmpHandshakeBytes.hasRemaining() != socketBuffer.hasRemaining() || !socketBuffer.hasRemaining() ); // the buffers will never have remaining bytes at the same time @@ -312,8 +308,6 @@ private boolean decodeHandshake( ByteBuffer socketBufferNew ) { } private void decodeFrames( ByteBuffer socketBuffer ) { - if( flushandclosestate ) - return; List frames; try { @@ -321,8 +315,6 @@ private void decodeFrames( ByteBuffer socketBuffer ) { for( Framedata f : frames ) { if( DEBUG ) System.out.println( "matched frame: " + f ); - if( flushandclosestate ) - return; Opcode curop = f.getOpcode(); boolean fin = f.isFin(); From 2c6f21d23afeb4fc7bd2effb666e11726b4193cf Mon Sep 17 00:00:00 2001 From: Davidiusdadi Date: Fri, 20 Sep 2013 01:14:18 +0200 Subject: [PATCH 7/9] added missing method getResourceDescriptor --- src/main/java/org/java_websocket/client/WebSocketClient.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/org/java_websocket/client/WebSocketClient.java b/src/main/java/org/java_websocket/client/WebSocketClient.java index 1aeb64688..7c5c90241 100644 --- a/src/main/java/org/java_websocket/client/WebSocketClient.java +++ b/src/main/java/org/java_websocket/client/WebSocketClient.java @@ -537,4 +537,9 @@ public InetSocketAddress getLocalSocketAddress() { public InetSocketAddress getRemoteSocketAddress() { return conn.getRemoteSocketAddress(); } + + @Override + public String getResourceDescriptor() { + return uri.getPath(); + } } From aef92290adbd53850337cfc2c651699764988038 Mon Sep 17 00:00:00 2001 From: Davidiusdadi Date: Fri, 20 Sep 2013 02:05:13 +0200 Subject: [PATCH 8/9] fixed npe cause by racing condition #205 --- .../org/java_websocket/WebSocketAdapter.java | 20 ++++++++++++++++--- .../org/java_websocket/WebSocketImpl.java | 8 ++++++-- .../org/java_websocket/WebSocketListener.java | 3 ++- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/java_websocket/WebSocketAdapter.java b/src/main/java/org/java_websocket/WebSocketAdapter.java index 31a48ab16..290e1049a 100644 --- a/src/main/java/org/java_websocket/WebSocketAdapter.java +++ b/src/main/java/org/java_websocket/WebSocketAdapter.java @@ -1,7 +1,10 @@ package org.java_websocket; +import java.net.InetSocketAddress; + import org.java_websocket.drafts.Draft; import org.java_websocket.exceptions.InvalidDataException; +import org.java_websocket.exceptions.InvalidHandshakeException; import org.java_websocket.framing.Framedata; import org.java_websocket.framing.Framedata.Opcode; import org.java_websocket.framing.FramedataImpl1; @@ -79,12 +82,23 @@ public void onWebsocketPong( WebSocket conn, Framedata f ) { * This is specifically implemented for gitime's WebSocket client for Flash: * http://github.com/gimite/web-socket-js * - * @return An XML String that comforms to Flash's security policy. You MUST + * @return An XML String that comforts to Flash's security policy. You MUST * not include the null char at the end, it is appended automatically. + * @throws InvalidDataException thrown when some data that is required to generate the flash-policy like the websocket local port could not be obtained e.g because the websocket is not connected. */ @Override - public String getFlashPolicy( WebSocket conn ) { - return "\0"; + public String getFlashPolicy( WebSocket conn ) throws InvalidDataException { + InetSocketAddress adr = conn.getLocalSocketAddress(); + if(null == adr){ + throw new InvalidHandshakeException( "socket not bound" ); + } + + StringBuffer sb = new StringBuffer( 90 ); + sb.append( "\0" ); + + return sb.toString(); } } diff --git a/src/main/java/org/java_websocket/WebSocketImpl.java b/src/main/java/org/java_websocket/WebSocketImpl.java index 44a8fc971..1d8f0f892 100644 --- a/src/main/java/org/java_websocket/WebSocketImpl.java +++ b/src/main/java/org/java_websocket/WebSocketImpl.java @@ -198,8 +198,12 @@ private boolean decodeHandshake( ByteBuffer socketBufferNew ) { if( draft == null ) { HandshakeState isflashedgecase = isFlashEdgeCase( socketBuffer ); if( isflashedgecase == HandshakeState.MATCHED ) { - write( ByteBuffer.wrap( Charsetfunctions.utf8Bytes( wsl.getFlashPolicy( this ) ) ) ); - close( CloseFrame.FLASHPOLICY, "" ); + try { + write( ByteBuffer.wrap( Charsetfunctions.utf8Bytes( wsl.getFlashPolicy( this ) ) ) ); + close( CloseFrame.FLASHPOLICY, "" ); + } catch ( InvalidDataException e ) { + close( CloseFrame.ABNORMAL_CLOSE, "remote peer closed connection before flashpolicy could be transmitted", true ); + } return false; } } diff --git a/src/main/java/org/java_websocket/WebSocketListener.java b/src/main/java/org/java_websocket/WebSocketListener.java index 4b35f245c..93478d940 100644 --- a/src/main/java/org/java_websocket/WebSocketListener.java +++ b/src/main/java/org/java_websocket/WebSocketListener.java @@ -139,8 +139,9 @@ public interface WebSocketListener { /** * Gets the XML string that should be returned if a client requests a Flash * security policy. + * @throws InvalidDataException thrown when some data that is required to generate the flash-policy like the websocket local port could not be obtained. */ - public String getFlashPolicy( WebSocket conn ); + public String getFlashPolicy( WebSocket conn ) throws InvalidDataException; /** This method is used to inform the selector thread that there is data queued to be written to the socket. */ public void onWriteDemand( WebSocket conn ); From 15f49ed574130bc1047abf009adb79f5aa50d939 Mon Sep 17 00:00:00 2001 From: Davidiusdadi Date: Mon, 23 Sep 2013 02:06:26 +0200 Subject: [PATCH 9/9] fixed websocket client socket not being closed in some cases and server waiting for clients to close connection first --- .../java_websocket/SocketChannelIOHelper.java | 16 ++++------------ .../java_websocket/client/WebSocketClient.java | 8 +++++++- .../java/org/java_websocket/drafts/Draft.java | 4 ++++ 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/java_websocket/SocketChannelIOHelper.java b/src/main/java/org/java_websocket/SocketChannelIOHelper.java index 379aaf014..e0da2bdc3 100644 --- a/src/main/java/org/java_websocket/SocketChannelIOHelper.java +++ b/src/main/java/org/java_websocket/SocketChannelIOHelper.java @@ -5,6 +5,8 @@ import java.nio.channels.ByteChannel; import java.nio.channels.spi.AbstractSelectableChannel; +import org.java_websocket.WebSocket.Role; + public class SocketChannelIOHelper { public static boolean read( final ByteBuffer buf, WebSocketImpl ws, ByteChannel channel ) throws IOException { @@ -59,21 +61,11 @@ public static boolean batch( WebSocketImpl ws, ByteChannel sockchannel ) throws } while ( buffer != null ); } - /*if( ws.outQueue.isEmpty() && ws.isFlushAndClose() ) {// + if( ws.outQueue.isEmpty() && ws.isFlushAndClose() && ws.getDraft().getRole() == Role.SERVER ) {// synchronized ( ws ) { ws.closeConnection(); } - }*/ + } return c != null ? !( (WrappedByteChannel) sockchannel ).isNeedWrite() : true; } - - public static void writeBlocking( WebSocketImpl ws, ByteChannel channel ) throws InterruptedException , IOException { - assert ( channel instanceof AbstractSelectableChannel == true ? ( (AbstractSelectableChannel) channel ).isBlocking() : true ); - assert ( channel instanceof WrappedByteChannel == true ? ( (WrappedByteChannel) channel ).isBlocking() : true ); - - ByteBuffer buf = ws.outQueue.take(); - while ( buf.hasRemaining() ) - channel.write( buf ); - } - } diff --git a/src/main/java/org/java_websocket/client/WebSocketClient.java b/src/main/java/org/java_websocket/client/WebSocketClient.java index e7e4f6c8d..13d7a9d2c 100644 --- a/src/main/java/org/java_websocket/client/WebSocketClient.java +++ b/src/main/java/org/java_websocket/client/WebSocketClient.java @@ -181,7 +181,7 @@ public void run() { int readBytes; try { - while ( ( readBytes = istream.read( rawbuffer ) ) != -1 ) { + while ( !isClosed() && ( readBytes = istream.read( rawbuffer ) ) != -1 ) { engine.decode( ByteBuffer.wrap( rawbuffer, 0, readBytes ) ); } engine.eot(); @@ -276,6 +276,12 @@ public final void onWebsocketClose( WebSocket conn, int code, String reason, boo closeLatch.countDown(); if( writeThread != null ) writeThread.interrupt(); + try { + if( socket != null ) + socket.close(); + } catch ( IOException e ) { + onWebsocketError( this, e ); + } onClose( code, reason, remote ); } diff --git a/src/main/java/org/java_websocket/drafts/Draft.java b/src/main/java/org/java_websocket/drafts/Draft.java index 1bad2ad7b..65b34de8f 100644 --- a/src/main/java/org/java_websocket/drafts/Draft.java +++ b/src/main/java/org/java_websocket/drafts/Draft.java @@ -220,5 +220,9 @@ public int checkAlloc( int bytecount ) throws LimitExedeedException , InvalidDat public void setParseMode( Role role ) { this.role = role; } + + public Role getRole() { + return role; + } }