diff --git a/src/main/example/ProxyClientExample.java b/src/main/example/ProxyClientExample.java
index 133756fd0..ddff0ea0d 100644
--- a/src/main/example/ProxyClientExample.java
+++ b/src/main/example/ProxyClientExample.java
@@ -1,26 +1,12 @@
import java.net.InetSocketAddress;
+import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
-import java.nio.channels.ByteChannel;
-
-public class ProxyClientExample extends ExampleClient {
-
- public ProxyClientExample( URI serverURI , InetSocketAddress proxy ) {
- super( serverURI );
- setProxy( proxy );
- }
-
- @Override
- public ByteChannel createProxyChannel( ByteChannel towrap ) {
- /*
- * You can create custom proxy handshake here.
- * For more infos see: WebSocketClient.DefaultClientProxyChannel and http://tools.ietf.org/html/rfc6455#section-4.1
- */
- return super.createProxyChannel( towrap );
- }
+public class ProxyClientExample {
public static void main( String[] args ) throws URISyntaxException {
- ProxyClientExample c = new ProxyClientExample( new URI( "ws://echo.websocket.org" ), new InetSocketAddress( "proxyaddress", 80 ) );// don't forget to change "proxyaddress"
+ ExampleClient c = new ExampleClient( new URI( "ws://echo.websocket.org" ) );
+ c.setProxy( new Proxy( Proxy.Type.HTTP, new InetSocketAddress( "proxyaddress", 80 ) ) );
c.connect();
}
}
diff --git a/src/main/example/SSLClientExample.java b/src/main/example/SSLClientExample.java
index 2231e5475..e740c9c54 100644
--- a/src/main/example/SSLClientExample.java
+++ b/src/main/example/SSLClientExample.java
@@ -7,10 +7,10 @@
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import org.java_websocket.WebSocketImpl;
-import org.java_websocket.client.DefaultSSLWebSocketClientFactory;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
@@ -79,7 +79,9 @@ public static void main( String[] args ) throws Exception {
sslContext.init( kmf.getKeyManagers(), tmf.getTrustManagers(), null );
// sslContext.init( null, null, null ); // will use java's default key and trust store which is sufficient unless you deal with self-signed certificates
- chatclient.setWebSocketFactory( new DefaultSSLWebSocketClientFactory( sslContext ) );
+ SSLSocketFactory factory = sslContext.getSocketFactory();// (SSLSocketFactory) SSLSocketFactory.getDefault();
+
+ chatclient.setSocket( factory.createSocket() );
chatclient.connectBlocking();
diff --git a/src/main/java/org/java_websocket/SocketChannelIOHelper.java b/src/main/java/org/java_websocket/SocketChannelIOHelper.java
index 9752098c4..e0da2bdc3 100644
--- a/src/main/java/org/java_websocket/SocketChannelIOHelper.java
+++ b/src/main/java/org/java_websocket/SocketChannelIOHelper.java
@@ -5,6 +5,8 @@
import java.nio.channels.ByteChannel;
import java.nio.channels.spi.AbstractSelectableChannel;
+import org.java_websocket.WebSocket.Role;
+
public class SocketChannelIOHelper {
public static boolean read( final ByteBuffer buf, WebSocketImpl ws, ByteChannel channel ) throws IOException {
@@ -59,21 +61,11 @@ public static boolean batch( WebSocketImpl ws, ByteChannel sockchannel ) throws
} while ( buffer != null );
}
- if( ws.outQueue.isEmpty() && ws.isFlushAndClose() /*&& ( c == null || c.isNeedWrite() )*/) {
+ if( ws.outQueue.isEmpty() && ws.isFlushAndClose() && ws.getDraft().getRole() == Role.SERVER ) {//
synchronized ( ws ) {
ws.closeConnection();
}
}
return c != null ? !( (WrappedByteChannel) sockchannel ).isNeedWrite() : true;
}
-
- public static void writeBlocking( WebSocketImpl ws, ByteChannel channel ) throws InterruptedException , IOException {
- assert ( channel instanceof AbstractSelectableChannel == true ? ( (AbstractSelectableChannel) channel ).isBlocking() : true );
- assert ( channel instanceof WrappedByteChannel == true ? ( (WrappedByteChannel) channel ).isBlocking() : true );
-
- ByteBuffer buf = ws.outQueue.take();
- while ( buf.hasRemaining() )
- channel.write( buf );
- }
-
}
diff --git a/src/main/java/org/java_websocket/WebSocketAdapter.java b/src/main/java/org/java_websocket/WebSocketAdapter.java
index 31a48ab16..290e1049a 100644
--- a/src/main/java/org/java_websocket/WebSocketAdapter.java
+++ b/src/main/java/org/java_websocket/WebSocketAdapter.java
@@ -1,7 +1,10 @@
package org.java_websocket;
+import java.net.InetSocketAddress;
+
import org.java_websocket.drafts.Draft;
import org.java_websocket.exceptions.InvalidDataException;
+import org.java_websocket.exceptions.InvalidHandshakeException;
import org.java_websocket.framing.Framedata;
import org.java_websocket.framing.Framedata.Opcode;
import org.java_websocket.framing.FramedataImpl1;
@@ -79,12 +82,23 @@ public void onWebsocketPong( WebSocket conn, Framedata f ) {
* This is specifically implemented for gitime's WebSocket client for Flash:
* http://github.com/gimite/web-socket-js
*
- * @return An XML String that comforms to Flash's security policy. You MUST
+ * @return An XML String that comforts to Flash's security policy. You MUST
* not include the null char at the end, it is appended automatically.
+ * @throws InvalidDataException thrown when some data that is required to generate the flash-policy like the websocket local port could not be obtained e.g because the websocket is not connected.
*/
@Override
- public String getFlashPolicy( WebSocket conn ) {
- return "\0";
+ public String getFlashPolicy( WebSocket conn ) throws InvalidDataException {
+ InetSocketAddress adr = conn.getLocalSocketAddress();
+ if(null == adr){
+ throw new InvalidHandshakeException( "socket not bound" );
+ }
+
+ StringBuffer sb = new StringBuffer( 90 );
+ sb.append( "\0" );
+
+ return sb.toString();
}
}
diff --git a/src/main/java/org/java_websocket/WebSocketImpl.java b/src/main/java/org/java_websocket/WebSocketImpl.java
index 44a8fc971..669bee146 100644
--- a/src/main/java/org/java_websocket/WebSocketImpl.java
+++ b/src/main/java/org/java_websocket/WebSocketImpl.java
@@ -151,15 +151,11 @@ public WebSocketImpl( WebSocketListener listener , List drafts , Socket s
public void decode( ByteBuffer socketBuffer ) {
assert ( socketBuffer.hasRemaining() );
- if( flushandclosestate ) {
- return;
- }
-
if( DEBUG )
System.out.println( "process(" + socketBuffer.remaining() + "): {" + ( socketBuffer.remaining() > 1000 ? "too big to display" : new String( socketBuffer.array(), socketBuffer.position(), socketBuffer.remaining() ) ) + "}" );
- if( readystate == READYSTATE.OPEN ) {
- decodeFrames( socketBuffer );
+ if( readystate != READYSTATE.NOT_YET_CONNECTED ) {
+ decodeFrames( socketBuffer );;
} else {
if( decodeHandshake( socketBuffer ) ) {
assert ( tmpHandshakeBytes.hasRemaining() != socketBuffer.hasRemaining() || !socketBuffer.hasRemaining() ); // the buffers will never have remaining bytes at the same time
@@ -198,8 +194,12 @@ private boolean decodeHandshake( ByteBuffer socketBufferNew ) {
if( draft == null ) {
HandshakeState isflashedgecase = isFlashEdgeCase( socketBuffer );
if( isflashedgecase == HandshakeState.MATCHED ) {
- write( ByteBuffer.wrap( Charsetfunctions.utf8Bytes( wsl.getFlashPolicy( this ) ) ) );
- close( CloseFrame.FLASHPOLICY, "" );
+ try {
+ write( ByteBuffer.wrap( Charsetfunctions.utf8Bytes( wsl.getFlashPolicy( this ) ) ) );
+ close( CloseFrame.FLASHPOLICY, "" );
+ } catch ( InvalidDataException e ) {
+ close( CloseFrame.ABNORMAL_CLOSE, "remote peer closed connection before flashpolicy could be transmitted", true );
+ }
return false;
}
}
@@ -315,8 +315,6 @@ private boolean decodeHandshake( ByteBuffer socketBufferNew ) {
}
private void decodeFrames( ByteBuffer socketBuffer ) {
- if( flushandclosestate )
- return;
List frames;
try {
@@ -324,8 +322,6 @@ private void decodeFrames( ByteBuffer socketBuffer ) {
for( Framedata f : frames ) {
if( DEBUG )
System.out.println( "matched frame: " + f );
- if( flushandclosestate )
- return;
Opcode curop = f.getOpcode();
boolean fin = f.isFin();
diff --git a/src/main/java/org/java_websocket/WebSocketListener.java b/src/main/java/org/java_websocket/WebSocketListener.java
index 4b35f245c..93478d940 100644
--- a/src/main/java/org/java_websocket/WebSocketListener.java
+++ b/src/main/java/org/java_websocket/WebSocketListener.java
@@ -139,8 +139,9 @@ public interface WebSocketListener {
/**
* Gets the XML string that should be returned if a client requests a Flash
* security policy.
+ * @throws InvalidDataException thrown when some data that is required to generate the flash-policy like the websocket local port could not be obtained.
*/
- public String getFlashPolicy( WebSocket conn );
+ public String getFlashPolicy( WebSocket conn ) throws InvalidDataException;
/** This method is used to inform the selector thread that there is data queued to be written to the socket. */
public void onWriteDemand( WebSocket conn );
diff --git a/src/main/java/org/java_websocket/client/DefaultSSLWebSocketClientFactory.java b/src/main/java/org/java_websocket/client/DefaultSSLWebSocketClientFactory.java
deleted file mode 100644
index 0646cea1a..000000000
--- a/src/main/java/org/java_websocket/client/DefaultSSLWebSocketClientFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.java_websocket.client;
-import java.io.IOException;
-import java.net.Socket;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import org.java_websocket.SSLSocketChannel2;
-import org.java_websocket.WebSocketAdapter;
-import org.java_websocket.WebSocketImpl;
-import org.java_websocket.client.WebSocketClient.WebSocketClientFactory;
-import org.java_websocket.drafts.Draft;
-
-
-public class DefaultSSLWebSocketClientFactory implements WebSocketClientFactory {
- protected SSLContext sslcontext;
- protected ExecutorService exec;
-
- public DefaultSSLWebSocketClientFactory( SSLContext sslContext ) {
- this( sslContext, Executors.newSingleThreadScheduledExecutor() );
- }
-
- public DefaultSSLWebSocketClientFactory( SSLContext sslContext , ExecutorService exec ) {
- if( sslContext == null || exec == null )
- throw new IllegalArgumentException();
- this.sslcontext = sslContext;
- this.exec = exec;
- }
-
- @Override
- public ByteChannel wrapChannel( SocketChannel channel, SelectionKey key, String host, int port ) throws IOException {
- SSLEngine e = sslcontext.createSSLEngine( host, port );
- e.setUseClientMode( true );
- return new SSLSocketChannel2( channel, e, exec, key );
- }
-
- @Override
- public WebSocketImpl createWebSocket( WebSocketAdapter a, Draft d, Socket c ) {
- return new WebSocketImpl( a, d, c );
- }
-
- @Override
- public WebSocketImpl createWebSocket( WebSocketAdapter a, List d, Socket s ) {
- return new WebSocketImpl( a, d, s );
- }
-}
\ No newline at end of file
diff --git a/src/main/java/org/java_websocket/client/DefaultWebSocketClientFactory.java b/src/main/java/org/java_websocket/client/DefaultWebSocketClientFactory.java
deleted file mode 100644
index 005f8f038..000000000
--- a/src/main/java/org/java_websocket/client/DefaultWebSocketClientFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.java_websocket.client;
-
-import java.net.Socket;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.List;
-
-import org.java_websocket.WebSocket;
-import org.java_websocket.WebSocketAdapter;
-import org.java_websocket.WebSocketImpl;
-import org.java_websocket.drafts.Draft;
-
-public class DefaultWebSocketClientFactory implements WebSocketClient.WebSocketClientFactory {
- /**
- *
- */
- private final WebSocketClient webSocketClient;
- /**
- * @param webSocketClient
- */
- public DefaultWebSocketClientFactory( WebSocketClient webSocketClient ) {
- this.webSocketClient = webSocketClient;
- }
- @Override
- public WebSocket createWebSocket( WebSocketAdapter a, Draft d, Socket s ) {
- return new WebSocketImpl( this.webSocketClient, d );
- }
- @Override
- public WebSocket createWebSocket( WebSocketAdapter a, List d, Socket s ) {
- return new WebSocketImpl( this.webSocketClient, d );
- }
- @Override
- public ByteChannel wrapChannel( SocketChannel channel, SelectionKey c, String host, int port ) {
- if( c == null )
- return channel;
- return channel;
- }
-}
\ No newline at end of file
diff --git a/src/main/java/org/java_websocket/client/WebSocketClient.java b/src/main/java/org/java_websocket/client/WebSocketClient.java
index 1aeb64688..13d7a9d2c 100644
--- a/src/main/java/org/java_websocket/client/WebSocketClient.java
+++ b/src/main/java/org/java_websocket/client/WebSocketClient.java
@@ -1,27 +1,22 @@
package org.java_websocket.client;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.NotYetConnectedException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.SelectorProvider;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import org.java_websocket.SocketChannelIOHelper;
import org.java_websocket.WebSocket;
import org.java_websocket.WebSocketAdapter;
-import org.java_websocket.WebSocketFactory;
import org.java_websocket.WebSocketImpl;
-import org.java_websocket.WrappedByteChannel;
import org.java_websocket.drafts.Draft;
-import org.java_websocket.drafts.Draft_10;
+import org.java_websocket.drafts.Draft_17;
import org.java_websocket.exceptions.InvalidHandshakeException;
import org.java_websocket.framing.CloseFrame;
import org.java_websocket.framing.Framedata;
@@ -31,14 +26,8 @@
import org.java_websocket.handshake.ServerHandshake;
/**
- * The WebSocketClient is an abstract class that expects a valid
- * "ws://" URI to connect to. When connected, an instance recieves important
- * events related to the life of the connection. A subclass must implement
- * onOpen, onClose, and onMessage to be
- * useful. An instance can send messages to it's connected server via the
- * send method.
- *
- * @author Nathan Rajlich
+ * A subclass must implement at least onOpen, onClose, and onMessage to be
+ * useful. At runtime the user is expected to establish a connection via {@link #connect()}, then receive events like {@link #onMessage(String)} via the overloaded methods and to {@link #send(String)} data to the server.
*/
public abstract class WebSocketClient extends WebSocketAdapter implements Runnable, WebSocket {
@@ -47,17 +36,17 @@ public abstract class WebSocketClient extends WebSocketAdapter implements Runnab
*/
protected URI uri = null;
- private WebSocketImpl conn = null;
- /**
- * The SocketChannel instance this channel uses.
- */
- private SocketChannel channel = null;
+ private WebSocketImpl engine = null;
+
+ private Socket socket = null;
+
+ private InputStream istream;
- private ByteChannel wrappedchannel = null;
+ private OutputStream ostream;
- private Thread writethread;
+ private Proxy proxy = Proxy.NO_PROXY;
- private Thread readthread;
+ private Thread writeThread;
private Draft draft;
@@ -67,92 +56,77 @@ public abstract class WebSocketClient extends WebSocketAdapter implements Runnab
private CountDownLatch closeLatch = new CountDownLatch( 1 );
- private int timeout = 0;
-
- private WebSocketClientFactory wsfactory = new DefaultWebSocketClientFactory( this );
-
- private InetSocketAddress proxyAddress = null;
+ private int connectTimeout = 0;
+ /** This open a websocket connection as specified by rfc6455 */
public WebSocketClient( URI serverURI ) {
- this( serverURI, new Draft_10() );
+ this( serverURI, new Draft_17() );
}
/**
* Constructs a WebSocketClient instance and sets it to the connect to the
- * specified URI. The channel does not attampt to connect automatically. You
- * must call connect first to initiate the socket connection.
+ * specified URI. The channel does not attampt to connect automatically. The connection
+ * will be established once you call connect.
*/
public WebSocketClient( URI serverUri , Draft draft ) {
this( serverUri, draft, null, 0 );
}
- public WebSocketClient( URI serverUri , Draft draft , Map headers , int connecttimeout ) {
+ public WebSocketClient( URI serverUri , Draft protocolDraft , Map httpHeaders , int connectTimeout ) {
if( serverUri == null ) {
throw new IllegalArgumentException();
- }
- if( draft == null ) {
+ } else if( protocolDraft == null ) {
throw new IllegalArgumentException( "null as draft is permitted for `WebSocketServer` only!" );
}
this.uri = serverUri;
- this.draft = draft;
- this.headers = headers;
- this.timeout = connecttimeout;
-
- try {
- channel = SelectorProvider.provider().openSocketChannel();
- channel.configureBlocking( true );
- } catch ( IOException e ) {
- channel = null;
- onWebsocketError( null, e );
- }
- if( channel == null ) {
- conn = (WebSocketImpl) wsfactory.createWebSocket( this, draft, null );
- conn.close( CloseFrame.NEVER_CONNECTED, "Failed to create or configure SocketChannel." );
- } else {
- conn = (WebSocketImpl) wsfactory.createWebSocket( this, draft, channel.socket() );
- }
-
+ this.draft = protocolDraft;
+ this.headers = httpHeaders;
+ this.connectTimeout = connectTimeout;
+ this.engine = new WebSocketImpl( this, protocolDraft );
}
/**
- * Gets the URI that this WebSocketClient is connected to.
- *
- * @return The URI for this WebSocketClient.
+ * Returns the URI that this WebSocketClient is connected to.
*/
public URI getURI() {
return uri;
}
- /** Returns the protocol version this channel uses. */
+ /**
+ * Returns the protocol version this channel uses.
+ * For more infos see https://github.com/TooTallNate/Java-WebSocket/wiki/Drafts
+ */
public Draft getDraft() {
return draft;
}
/**
- * Starts a background thread that attempts and maintains a WebSocket
- * connection to the URI specified in the constructor or via setURI.
- * setURI.
+ * Initiates the websocket connection. This method does not block.
*/
public void connect() {
- if( writethread != null )
+ if( writeThread != null )
throw new IllegalStateException( "WebSocketClient objects are not reuseable" );
- writethread = new Thread( this );
- writethread.start();
+ writeThread = new Thread( this );
+ writeThread.start();
}
/**
- * Same as connect but blocks until the websocket connected or failed to do so.
+ * Same as connect
but blocks until the websocket connected or failed to do so.
* Returns whether it succeeded or not.
**/
public boolean connectBlocking() throws InterruptedException {
connect();
connectLatch.await();
- return conn.isOpen();
+ return engine.isOpen();
}
+ /**
+ * Initiates the websocket close handshake. This method does not block
+ * In oder to make sure the connection is closed use closeBlocking
+ */
public void close() {
- if( writethread != null ) {
- conn.close( CloseFrame.NORMAL );
+ if( writeThread != null ) {
+ engine.close( CloseFrame.NORMAL );
}
}
@@ -162,102 +136,64 @@ public void closeBlocking() throws InterruptedException {
}
/**
- * Sends text to the connected WebSocket server.
+ * Sends text to the connected websocket server.
*
* @param text
- * The String to send to the WebSocket server.
+ * The string which will be transmitted.
*/
public void send( String text ) throws NotYetConnectedException {
- conn.send( text );
- }
-
- public void sendFragment( Framedata f ) {
- conn.sendFrame( f );
+ engine.send( text );
}
/**
- * Sends data to the connected WebSocket server.
+ * Sends binary data to the connected webSocket server.
*
* @param data
- * The Byte-Array of data to send to the WebSocket server.
+ * The byte-Array of data to send to the WebSocket server.
*/
public void send( byte[] data ) throws NotYetConnectedException {
- conn.send( data );
+ engine.send( data );
}
- // Runnable IMPLEMENTATION /////////////////////////////////////////////////
public void run() {
- if( writethread == null )
- writethread = Thread.currentThread();
- interruptableRun();
-
- assert ( !channel.isOpen() );
-
- }
-
- private final void interruptableRun() {
- if( channel == null ) {
- return;// channel will be initialized in the constructor and only be null if no socket channel could be created or if blocking mode could be established
- }
-
try {
- String host;
- int port;
-
- if( proxyAddress != null ) {
- host = proxyAddress.getHostName();
- port = proxyAddress.getPort();
- } else {
- host = uri.getHost();
- port = getPort();
+ if( socket == null ) {
+ socket = new Socket( proxy );
+ } else if( socket.isClosed() ) {
+ throw new IOException();
}
- channel.connect( new InetSocketAddress( host, port ) );
- conn.channel = wrappedchannel = createProxyChannel( wsfactory.wrapChannel( channel, null, host, port ) );
+ if( !socket.isBound() )
+ socket.connect( new InetSocketAddress( uri.getHost(), getPort() ), connectTimeout );
+ istream = socket.getInputStream();
+ ostream = socket.getOutputStream();
- timeout = 0; // since connect is over
sendHandshake();
- readthread = new Thread( new WebsocketWriteThread() );
- readthread.start();
- } catch ( ClosedByInterruptException e ) {
- onWebsocketError( null, e );
- return;
- } catch ( /*IOException | SecurityException | UnresolvedAddressException*/Exception e ) {//
- onWebsocketError( conn, e );
- conn.closeConnection( CloseFrame.NEVER_CONNECTED, e.getMessage() );
+ } catch ( /*IOException | SecurityException | UnresolvedAddressException | InvalidHandshakeException | ClosedByInterruptException | SocketTimeoutException */Exception e ) {
+ onWebsocketError( engine, e );
+ engine.closeConnection( CloseFrame.NEVER_CONNECTED, e.getMessage() );
return;
}
- ByteBuffer buff = ByteBuffer.allocate( WebSocketImpl.RCVBUF );
- try/*IO*/{
- while ( channel.isOpen() ) {
- if( SocketChannelIOHelper.read( buff, this.conn, wrappedchannel ) ) {
- conn.decode( buff );
- } else {
- conn.eot();
- }
+ writeThread = new Thread( new WebsocketWriteThread() );
+ writeThread.start();
- if( wrappedchannel instanceof WrappedByteChannel ) {
- WrappedByteChannel w = (WrappedByteChannel) wrappedchannel;
- if( w.isNeedRead() ) {
- while ( SocketChannelIOHelper.readMore( buff, conn, w ) ) {
- conn.decode( buff );
- }
- conn.decode( buff );
- }
- }
- }
+ byte[] rawbuffer = new byte[ WebSocketImpl.RCVBUF ];
+ int readBytes;
- } catch ( CancelledKeyException e ) {
- conn.eot();
+ try {
+ while ( !isClosed() && ( readBytes = istream.read( rawbuffer ) ) != -1 ) {
+ engine.decode( ByteBuffer.wrap( rawbuffer, 0, readBytes ) );
+ }
+ engine.eot();
} catch ( IOException e ) {
- conn.eot();
+ engine.eot();
} catch ( RuntimeException e ) {
// this catch case covers internal errors only and indicates a bug in this websocket implementation
onError( e );
- conn.closeConnection( CloseFrame.ABNORMAL_CLOSE, e.getMessage() );
+ engine.closeConnection( CloseFrame.ABNORMAL_CLOSE, e.getMessage() );
}
+ assert ( socket.isClosed() );
}
-
private int getPort() {
int port = uri.getPort();
if( port == -1 ) {
@@ -294,22 +230,18 @@ private void sendHandshake() throws InvalidHandshakeException {
handshake.put( kv.getKey(), kv.getValue() );
}
}
- conn.startHandshake( handshake );
+ engine.startHandshake( handshake );
}
/**
* This represents the state of the connection.
- * You can use this method instead of
*/
public READYSTATE getReadyState() {
- return conn.getReadyState();
+ return engine.getReadyState();
}
/**
* Calls subclass' implementation of onMessage.
- *
- * @param conn
- * @param message
*/
@Override
public final void onWebsocketMessage( WebSocket conn, String message ) {
@@ -328,8 +260,6 @@ public void onWebsocketMessageFragment( WebSocket conn, Framedata frame ) {
/**
* Calls subclass' implementation of onOpen.
- *
- * @param conn
*/
@Override
public final void onWebsocketOpen( WebSocket conn, Handshakedata handshake ) {
@@ -339,22 +269,24 @@ public final void onWebsocketOpen( WebSocket conn, Handshakedata handshake ) {
/**
* Calls subclass' implementation of onClose.
- *
- * @param conn
*/
@Override
public final void onWebsocketClose( WebSocket conn, int code, String reason, boolean remote ) {
connectLatch.countDown();
closeLatch.countDown();
- if( readthread != null )
- readthread.interrupt();
+ if( writeThread != null )
+ writeThread.interrupt();
+ try {
+ if( socket != null )
+ socket.close();
+ } catch ( IOException e ) {
+ onWebsocketError( this, e );
+ }
onClose( code, reason, remote );
}
/**
* Calls subclass' implementation of onIOError.
- *
- * @param conn
*/
@Override
public final void onWebsocketError( WebSocket conn, Exception ex ) {
@@ -383,28 +315,20 @@ public void onClosing( int code, String reason, boolean remote ) {
}
public WebSocket getConnection() {
- return conn;
- }
-
- public final void setWebSocketFactory( WebSocketClientFactory wsf ) {
- this.wsfactory = wsf;
- }
-
- public final WebSocketFactory getWebSocketFactory() {
- return wsfactory;
+ return engine;
}
@Override
public InetSocketAddress getLocalSocketAddress( WebSocket conn ) {
- if( channel != null )
- return (InetSocketAddress) channel.socket().getLocalSocketAddress();
+ if( socket != null )
+ return (InetSocketAddress) socket.getLocalSocketAddress();
return null;
}
@Override
public InetSocketAddress getRemoteSocketAddress( WebSocket conn ) {
- if( channel != null )
- return (InetSocketAddress) channel.socket().getLocalSocketAddress();
+ if( socket != null )
+ return (InetSocketAddress) socket.getLocalSocketAddress();
return null;
}
@@ -418,123 +342,113 @@ public void onMessage( ByteBuffer bytes ) {
public void onFragment( Framedata frame ) {
}
- public class DefaultClientProxyChannel extends AbstractClientProxyChannel {
- public DefaultClientProxyChannel( ByteChannel towrap ) {
- super( towrap );
- }
- @Override
- public String buildHandShake() {
- StringBuilder b = new StringBuilder();
- String host = uri.getHost();
- b.append( "CONNECT " );
- b.append( host );
- b.append( ":" );
- b.append( getPort() );
- b.append( " HTTP/1.1\n" );
- b.append( "Host: " );
- b.append( host );
- b.append( "\n" );
- return b.toString();
- }
- }
-
- public interface WebSocketClientFactory extends WebSocketFactory {
- public ByteChannel wrapChannel( SocketChannel channel, SelectionKey key, String host, int port ) throws IOException;
- }
-
private class WebsocketWriteThread implements Runnable {
@Override
public void run() {
Thread.currentThread().setName( "WebsocketWriteThread" );
try {
while ( !Thread.interrupted() ) {
- SocketChannelIOHelper.writeBlocking( conn, wrappedchannel );
+ ByteBuffer buffer = engine.outQueue.take();
+ ostream.write( buffer.array(), 0, buffer.limit() );
+ ostream.flush();
}
} catch ( IOException e ) {
- conn.eot();
+ engine.eot();
} catch ( InterruptedException e ) {
// this thread is regularly terminated via an interrupt
}
}
}
- public ByteChannel createProxyChannel( ByteChannel towrap ) {
- if( proxyAddress != null ) {
- return new DefaultClientProxyChannel( towrap );
- }
- return towrap;// no proxy in use
+ public void setProxy( Proxy proxy ) {
+ if( proxy == null )
+ throw new IllegalArgumentException();
+ this.proxy = proxy;
}
- public void setProxy( InetSocketAddress proxyaddress ) {
- proxyAddress = proxyaddress;
+ /**
+ * Accepts bound and unbound sockets.
+ * This method must be called before connect
.
+ * If the given socket is not yet bound it will be bound to the uri specified in the constructor.
+ **/
+ public void setSocket( Socket socket ) {
+ if( this.socket != null ) {
+ throw new IllegalStateException( "socket has already been set" );
+ }
+ this.socket = socket;
}
@Override
public void sendFragmentedFrame( Opcode op, ByteBuffer buffer, boolean fin ) {
- conn.sendFragmentedFrame( op, buffer, fin );
+ engine.sendFragmentedFrame( op, buffer, fin );
}
@Override
public boolean isOpen() {
- return conn.isOpen();
+ return engine.isOpen();
}
@Override
public boolean isFlushAndClose() {
- return conn.isFlushAndClose();
+ return engine.isFlushAndClose();
}
@Override
public boolean isClosed() {
- return conn.isClosed();
+ return engine.isClosed();
}
@Override
public boolean isClosing() {
- return conn.isClosing();
+ return engine.isClosing();
}
@Override
public boolean isConnecting() {
- return conn.isConnecting();
+ return engine.isConnecting();
}
@Override
public boolean hasBufferedData() {
- return conn.hasBufferedData();
+ return engine.hasBufferedData();
}
@Override
public void close( int code ) {
- conn.close();
+ engine.close();
}
@Override
public void close( int code, String message ) {
- conn.close( code, message );
+ engine.close( code, message );
}
@Override
public void closeConnection( int code, String message ) {
- conn.closeConnection( code, message );
+ engine.closeConnection( code, message );
}
@Override
public void send( ByteBuffer bytes ) throws IllegalArgumentException , NotYetConnectedException {
- conn.send( bytes );
+ engine.send( bytes );
}
@Override
public void sendFrame( Framedata framedata ) {
- conn.sendFrame( framedata );
+ engine.sendFrame( framedata );
}
@Override
public InetSocketAddress getLocalSocketAddress() {
- return conn.getLocalSocketAddress();
+ return engine.getLocalSocketAddress();
}
@Override
public InetSocketAddress getRemoteSocketAddress() {
- return conn.getRemoteSocketAddress();
+ return engine.getRemoteSocketAddress();
+ }
+
+ @Override
+ public String getResourceDescriptor() {
+ return uri.getPath();
}
}
diff --git a/src/main/java/org/java_websocket/drafts/Draft.java b/src/main/java/org/java_websocket/drafts/Draft.java
index 1bad2ad7b..65b34de8f 100644
--- a/src/main/java/org/java_websocket/drafts/Draft.java
+++ b/src/main/java/org/java_websocket/drafts/Draft.java
@@ -220,5 +220,9 @@ public int checkAlloc( int bytecount ) throws LimitExedeedException , InvalidDat
public void setParseMode( Role role ) {
this.role = role;
}
+
+ public Role getRole() {
+ return role;
+ }
}
diff --git a/src/main/java/org/java_websocket/server/WebSocketServer.java b/src/main/java/org/java_websocket/server/WebSocketServer.java
index 6257d7d81..a45f7e133 100644
--- a/src/main/java/org/java_websocket/server/WebSocketServer.java
+++ b/src/main/java/org/java_websocket/server/WebSocketServer.java
@@ -35,10 +35,12 @@
import org.java_websocket.WebSocketImpl;
import org.java_websocket.WrappedByteChannel;
import org.java_websocket.drafts.Draft;
+import org.java_websocket.exceptions.InvalidDataException;
import org.java_websocket.framing.CloseFrame;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.handshake.Handshakedata;
+import org.java_websocket.handshake.ServerHandshakeBuilder;
/**
* WebSocketServer is an abstract class that only takes care of the
@@ -193,43 +195,42 @@ public void start() {
* If this method is called before the server is started it will never start.
*
* @param timeout
- * Specifies how many milliseconds shall pass between initiating the close handshakes with the connected clients and closing the servers socket channel.
+ * Specifies how many milliseconds the overall close handshaking may take altogether before the connections are closed without proper close handshaking.
*
* @throws IOException
* When {@link ServerSocketChannel}.close throws an IOException
* @throws InterruptedException
*/
- public void stop( int timeout ) throws IOException , InterruptedException {
- if( !isclosed.compareAndSet( false, true ) ) {
+ public void stop( int timeout ) throws InterruptedException {
+ if( !isclosed.compareAndSet( false, true ) ) { // this also makes sure that no further connections will be added to this.connections
return;
}
+ List socketsToClose = null;
+
+ // copy the connections in a list (prevent callback deadlocks)
synchronized ( connections ) {
- for( WebSocket ws : connections ) {
- ws.close( CloseFrame.GOING_AWAY );
- }
+ socketsToClose = new ArrayList( connections );
+ }
+
+ for( WebSocket ws : socketsToClose ) {
+ ws.close( CloseFrame.GOING_AWAY );
}
+
synchronized ( this ) {
if( selectorthread != null ) {
if( Thread.currentThread() != selectorthread ) {
}
if( selectorthread != Thread.currentThread() ) {
- selectorthread.interrupt();
+ if( socketsToClose.size() > 0 )
+ selectorthread.join( timeout );// isclosed will tell the selectorthread to go down after the last connection was closed
+ selectorthread.interrupt();// in case the selectorthread did not terminate in time we send the interrupt
selectorthread.join();
}
}
- if( decoders != null ) {
- for( WebSocketWorker w : decoders ) {
- w.interrupt();
- }
- }
- if( server != null ) {
- server.close();
- }
}
}
-
public void stop() throws IOException , InterruptedException {
stop( 0 );
}
@@ -326,16 +327,18 @@ public void run() {
conn = (WebSocketImpl) key.attachment();
ByteBuffer buf = takeBuffer();
try {
- if( SocketChannelIOHelper.read( buf, conn, (ByteChannel) conn.channel ) ) {
- assert ( buf.hasRemaining() );
- conn.inQueue.put( buf );
- queue( conn );
- i.remove();
- if( conn.channel instanceof WrappedByteChannel ) {
- if( ( (WrappedByteChannel) conn.channel ).isNeedRead() ) {
- iqueue.add( conn );
+ if( SocketChannelIOHelper.read( buf, conn, conn.channel ) ) {
+ if( buf.hasRemaining() ) {
+ conn.inQueue.put( buf );
+ queue( conn );
+ i.remove();
+ if( conn.channel instanceof WrappedByteChannel ) {
+ if( ( (WrappedByteChannel) conn.channel ).isNeedRead() ) {
+ iqueue.add( conn );
+ }
}
- }
+ } else
+ pushBuffer( buf );
} else {
pushBuffer( buf );
}
@@ -346,7 +349,7 @@ public void run() {
}
if( key.isWritable() ) {
conn = (WebSocketImpl) key.attachment();
- if( SocketChannelIOHelper.batch( conn, (ByteChannel) conn.channel ) ) {
+ if( SocketChannelIOHelper.batch( conn, conn.channel ) ) {
if( key.isValid() )
key.interestOps( SelectionKey.OP_READ );
}
@@ -359,9 +362,12 @@ public void run() {
try {
if( SocketChannelIOHelper.readMore( buf, conn, c ) )
iqueue.add( conn );
- assert ( buf.hasRemaining() );
- conn.inQueue.put( buf );
- queue( conn );
+ if( buf.hasRemaining() ) {
+ conn.inQueue.put( buf );
+ queue( conn );
+ } else {
+ pushBuffer( buf );
+ }
} catch ( IOException e ) {
pushBuffer( buf );
throw e;
@@ -384,6 +390,19 @@ public void run() {
} catch ( RuntimeException e ) {
// should hopefully never occur
handleFatal( null, e );
+ } finally {
+ if( decoders != null ) {
+ for( WebSocketWorker w : decoders ) {
+ w.interrupt();
+ }
+ }
+ if( server != null ) {
+ try {
+ server.close();
+ } catch ( IOException e ) {
+ onError( null, e );
+ }
+ }
}
}
protected void allocateBuffers( WebSocket c ) throws InterruptedException {
@@ -515,18 +534,35 @@ public final void onWebsocketClose( WebSocket conn, int code, String reason, boo
* Depending on the type on the connection, modifications of that collection may have to be synchronized.
**/
protected boolean removeConnection( WebSocket ws ) {
+ boolean removed;
synchronized ( connections ) {
- return this.connections.remove( ws );
+ removed = this.connections.remove( ws );
+ assert ( removed );
}
+ if( isclosed.get() && connections.size() == 0 ) {
+ selectorthread.interrupt();
+ }
+ return removed;
+ }
+ @Override
+ public ServerHandshakeBuilder onWebsocketHandshakeReceivedAsServer( WebSocket conn, Draft draft, ClientHandshake request ) throws InvalidDataException {
+ return super.onWebsocketHandshakeReceivedAsServer( conn, draft, request );
}
/** @see #removeConnection(WebSocket) */
protected boolean addConnection( WebSocket ws ) {
- synchronized ( connections ) {
- return this.connections.add( ws );
+ if( !isclosed.get() ) {
+ synchronized ( connections ) {
+ boolean succ = this.connections.add( ws );
+ assert ( succ );
+ return succ;
+ }
+ } else {
+ // This case will happen when a new connection gets ready while the server is already stopping.
+ ws.close( CloseFrame.GOING_AWAY );
+ return true;// for consistency sake we will make sure that both onOpen will be called
}
}
-
/**
* @param conn
* may be null if the error does not belong to a single connection