Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
target/
.project
.classpath
.cache
.target/
*~
.#*
.*.swp
.DS_Store
.codefellow
.ensime*
.eprj
.history
.idea
.idea_modules
.gradle
.settings
bin
build
out
*.iml
*.ipr
*.iws
test-output
test-results
test-tmp
*.class
1 change: 1 addition & 0 deletions api/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/bin
3 changes: 3 additions & 0 deletions api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# reactive-streams-io API

This module provides the API for clients and servers for all transports supported by [reactive-streams-io]
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
description = "reactive-streams-io"
26 changes: 26 additions & 0 deletions api/src/main/java/org/reactivestreamsio/tcp/TcpClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.reactivestreamsio.tcp;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.net.SocketAddress;

/**
* A TCP client.
*
* @param <IN> Type of object written on a TCP connection created by this client.
* @param <OUT> Type of object read from a TCP connection created by this client.
*/
public interface TcpClient<IN, OUT> {

/**
* A request for connections, upon calling {@link Publisher#subscribe(Subscriber)} on the returned {@link Publisher}.
* Each subscription can request as many connections as required by using {@link Subscription#request(long)}
*
* @param remoteAddress The remote address to connect.
*
* @return A {@link Publisher} of {@link TcpConnection}
*/
Publisher<TcpConnection<OUT, IN>> connect(SocketAddress remoteAddress);
}
172 changes: 172 additions & 0 deletions api/src/main/java/org/reactivestreamsio/tcp/TcpConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package org.reactivestreamsio.tcp;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
* An abstraction for a TCP connection.
*
* <h2>Reading data</h2>
* This connection is a {@link Publisher} and hence one can call {@link Publisher#subscribe(Subscriber)} to start
* reading data from the connection.
*
* <h3>Multiple read subscriptions</h3>
* This connection does <em>not</em> allow multiple subscription to the content and any subscriptions post the first
* subscriber, will receive an error.
*
* <h2>Ignoring data</h2>
*
* Since, this connection does not read any data from the underneath socket, unless someone subscribes, a convenience
* method for ignoring all data on the socket is provided as {@link #ignoreContent()}
*
* <h2>Writing data</h2>
*
* This connection provides two ways of writing data:
* <ul>
<li><i>Single Message:</i> One can write one item at a time, using {@link #write(Object)}</li>
<li><i>Message stream:</i> One can provide a {@link Publisher} of data, using {@link #write(Publisher)}. This
connection will subscribe to such a {@link Publisher} and write all items emitted by that publisher on the connection.
</li>
</ul>
*
* Writing any items using these methods, do <em>not</em> actually write the data on the underneath socket. An explicit
* {@link #flush()} call is required to write the data on the underneath socket.
* One can also use the shorthand {@link #writeAndFlush(Object)} or {@link #writeAndFlush(Publisher)} to write and flush
* the item(s)
*
* <h2>Flush</h2>
*
* In order to optimize the number of system calls to write data on the underneath socket, this connection breaks the
* writing process into two parts, viz., write and flush. A flush call writes all previous writes on this connection on
* the underneath socket.
*
* <h2>Write errors</h2>
*
* Every write call returns a {@link Publisher} that represents the acknowledgment of the write action. If the write
* failed, then the subscriber's {@link Subscriber#onError(Throwable)} method will be called and if it succeeds, then
* subscriber's {@link Subscriber#onComplete()} method will be called
*
* <h2>Flush errors</h2>
*
* Since, a {@link #flush()} is a batch operation encompassing all previous unflushed writes, it will represent the
* result of all those writes. It will fail if any one of the writes fail and it will not be possible to correlate which
* write caused the flush to fail. In order to get granular result per write, one has to subscribe to the
* {@link Publisher} of that particular write.
*
* <h2>Close</h2>
*
* Since, this connection only allows a single subscription, cancelling that subscription
* (via {@link Subscription#cancel()}) closes the connection.
* For cases which ignores content (via {@link #ignoreContent()}), an explicit {@link #close()} is required to be
* invoked for closing this connection.
*
* @param <R> Type of objects read from this connection.
* @param <W> Type of objects written to this connection.
*/
public interface TcpConnection<R, W> extends Publisher<R> {

/**
* Subscribes to the content stream of this connection.
*
* <h2>Multiple subscriptions</h2>
* This connection only allows a single subscription to the content stream. Any subsequent subscription will result
* in invocation of {@link Subscriber#onError(Throwable)} on that subscriber.
*
* @param subscriber Subscriber for the content.
*/
@Override
void subscribe(Subscriber<? super R> subscriber);

/**
* Ignores all content of this connection. This is equivalent to calling {@link #subscribe(Subscriber)} with a
* subscriber that discards all received items.
*/
void ignoreContent();

/**
* On subscription of the returned {@link Publisher}, writes the passed {@code msg} on this connection.
* Every subscription will write the same {@code msg} on the connection.
*
* <b>This does not flush the write.</b> Call {@link #flush()} to flush this write.
* @param msg Message to write.
*
* @return A {@link Publisher} representing the result of this write. One has to subscribe to this {@link Publisher}
* to trigger the write. Each subscription will trigger a write of the same message. The subscriber to this
* {@link Publisher} will not complete unless {@link #flush()} is called on this connection.
*/
Publisher<Void> write(W msg);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that to write a value to the network I have to do three things:

  • Call TcpConnection.write(value)
  • Subscribe to the Publisher it returns and handle errors
  • Call TcpConnection.flush()

Why is the Publisher necessary, and not just write + flush? And why use a Publisher and not a simple callback? Does it actually publish something, or just an onComplete signal? When would I call write(msg) and not immediately subscribe to the returned Publisher?

This is probably following a pattern from elsewhere I'm not familiar with...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to enable composition. If it was just a callback then RS libraries would immediately have to convert the callback into an RS to compose, such as with flatMap in order to wait on a write or handle errors. It would be a Publisher<Void> since it is just the terminal onError/onComplete states that matter for a write.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand now: the returned Publisher describes the asynchronous signal that says the write+flush have completed. In other words, it's acting as a Future.

Is it standard practice in Java RS-based interfaces to use a 0-item or 1-item Publisher as a Future? Or just in standard interfaces which need to be usable by any RS implementation? It just looks overly complex, and possibly more expensive than a Future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, it's acting as a Future.

Yes but not quiet like a Future! Futures are always hot and can only be used once. Publisher here and Observable in RxJava are cold (in most cases) i.e. one needs to subscribe to them to start execution. Also, one can subscribe to them multiple times. In this case, multiple subscriptions would trigger multiple write+flush.

Is it standard practice in Java RS-based interfaces to use a 0-item or 1-item Publisher as a Future? Or just in standard interfaces which need to be usable by any RS implementation? It just looks overly complex, and possibly more expensive than a Future.

I generally prefer using a single async abstraction and hence use Observable everywhere. In case of interfacing with netty, I bridged a Subscriber and Future from netty.
It takes a bit of getting used to treating a scalar response as a stream of 1 but once you wrap your head around it, it has a benefit of consistency. FWIW, RxJava has this issue ReactiveX/RxJava#1594 to introduce Future/Task in RxJava.


/**
* On subscription of the returned {@link Publisher}, subscribes to the passed {@code msgPublisher} and writes each
* emitted item on this connection.
*
* <b>This does not flush the write.</b> Call {@link #flush()} to flush this write.
*
* @param msgPublisher A {@link Publisher} of messages which are to be written to this connection.
*
* @return A {@link Publisher} representing the result of this write. One has to subscribe to this {@link Publisher}
* to trigger the write. Each subscription will trigger a subscription to the {@code msgPublisher}. The subscriber
* to this {@link Publisher} will not complete unless {@link #flush()} is called on this connection.
*/
Publisher<Void> write(Publisher<W> msgPublisher);

/**
* On subscription of the returned {@link Publisher}, flushes all previously unflushed writes on the underneath
* socket.
*
* @return A {@link Publisher} representing the result of this flush. One has to subscribe to this {@link Publisher}
* to trigger the flush.
*/
Publisher<Void> flush();

/**
* A shorthand for {@link #write(Object)} and {@link #flush()}. Every subscription to the returned {@link Publisher}
* will first write the passed {@code message} and then flush.
*
* <h2>Pending write failures and this result</h2>
*
* The returned {@link Publisher} only provides the result of this particular write and not any other writes which
* were not flushed before this write.
*
* @param msg Message to write and flush.
*
* @return A {@link Publisher} representing the result of this write. One has to subscribe to this {@link Publisher}
* to trigger the write and flush. Each subscription will trigger a write of the same message.
*/
Publisher<Void> writeAndFlush(W msg);

/**
* A shorthand for {@link #write(Publisher)} and {@link #flush()}. Every subscription to the returned
* {@link Publisher} will first subscribe to the passed {@code msgPublisher}, write all emitted items by that
* publisher and then flush.
*
* <h2>Pending write failures and this result</h2>
*
* The returned {@link Publisher} only provides the result of this particular write and not any other writes which
* were not flushed before this write.
*
* <h2>Flush</h2>
*
* Flush will only be called after receiving an {@link Subscriber#onComplete()} from {@code msgPublisher}
*
* @param msgPublisher A {@link Publisher} of messages which are to be written and flushed to this connection.
*
* @return A {@link Publisher} representing the result of this write. One has to subscribe to this {@link Publisher}
* to trigger the write and flush. Each subscription will trigger a write of the same message.
*/
Publisher<Void> writeAndFlush(Publisher<W> msgPublisher);

/**
* Closes this connection on subscription of the returned {@link Publisher}.
*
* <h2>Multiple subscriptions</h2>
*
* Multiple subscriptions are allowed to the returned {@link Publisher} but only the first subscription closes the
* underneath socket. All other subscriptions receive the same result, which is the result of the close of the
* underneath socket.
*
* @return {@link Publisher}, first subscription to which closes the connection.
*/
Publisher<Void> close();
}
77 changes: 77 additions & 0 deletions api/src/main/java/org/reactivestreamsio/tcp/TcpServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.reactivestreamsio.tcp;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

import java.lang.IllegalStateException;

/**
* A TCP server.
*
* @param <IN> Type of object read from a TCP connection accepted by this server.
* @param <OUT> Type of object written to a TCP connection accepted by this server.
*/
public interface TcpServer<IN, OUT> {

/**
* Starts this server and sends all client connections accepted by this server to the passed
* {@code connectionHandler}.
*
* @param connectionHandler Connection handler that receives all connections accepted by this server.
*
* @return This server.
*
* @throws IllegalStateException If this server is already started.
*/
TcpServer<IN, OUT> start(ConnectionHandler<IN, OUT> connectionHandler);

/**
* Same as calling {@code start(TcpServer.ConnectionHandler).awaitShutdown()}.
*
* @param connectionHandler Connection handler that receives all connections accepted by this server.
*/
void startAndAwait(ConnectionHandler<IN, OUT> connectionHandler);

/**
* Shutdown this server.
*
* @throws IllegalStateException If this server is already shutdown.
*/
void shutdown();

/**
* Blocks the calling thread till this server is shutdown. Useful when this server controls the lifecycle of the
* process creating this server.
*/
void awaitShutdown();

/**
* Returns the port on which this server is listening.
*
* <h2>Ephemeral ports</h2>
* If this server is created to run on an ephemeral port, then this method would return 0, unless, the server is
* started via {@link #start(TcpServer.ConnectionHandler)}
*
* @return The port on which this server is listening for client connections.
*/
int getServerPort();

/**
* A receiver for all accepted client {@link TcpConnection} by the associated server.
*
* @param <IN> Type of object read from a TCP connection accepted by this handler.
* @param <OUT> Type of object written to a TCP connection accepted by this handler.
*/
interface ConnectionHandler<IN, OUT> {

/**
* A callback for handling a newly accepted client {@link TcpConnection} by the associated server.
*
* @param newConnection Newly accepted client connection.
*
* @return {@link Publisher}, a subscription to which should start the processing of the {@code newConnection}.
* On calling {@link Subscription#cancel()} on the subscription should cancel the processing of this connection.
*/
Publisher<Void> handle(TcpConnection<IN, OUT> newConnection);
}
}
Loading