Skip to content

Conversation

NiteshKant
Copy link

This PR is an attempt to encapsulate how a Java Binding (Issue #5 ) for a TCP client/server look like. This is more of a strawman for discussion on this topic as opposed to something that can be merged.

I have started with a very basic client and server skeleton and this is no way close to what we at Netflix are pursuing with RxNetty. However, I hope to start introducing higher level concepts like application and network interceptors, complex flush semantics, server pools (as opposed to a single target server in this proposal), etc. if this approach is palatable.

The code contains of three classes, viz.,

TcpConnection

An abstraction over a network socket, providing RS abstractions for reading and writing data.

Write semantics

I have proposed similar write semantics as RxNetty (following the semantics in netty), which breaks down the write into two actions => write and flush. The idea is to be able to batch writes and make a single network call for flushing data on the underneath socket.

Writes come in two flavors,

  • Write a single item.
  • Write a stream of items.

The reason for the above is that writing a single item makes the usecase of "simple" clients & servers terse. Creation of a Publisher otherwise becomes cumbersome in such cases.

Read semantics

TcpConnection extends Publisher and hence it can be subscribed to for reading content from the connection.

Multiple subscriptions

I have proposed that we disallow multiple subscriptions to the content as it would mean that we will have a Subject that publishes data to multiple subscribers because every subscriber can not read data individually from the socket.

Auto-close

I have proposed that we close the connection as soon the sole subscriber of the content cancels the subscription. For fire-and-forget cases where the user is not interested in the content, the close will be explicit.

TcpClient

An abstraction for TCP based clients.
This offers a single method for creating new connections. This method returns a Publisher of connections and hence can be used to create as many connections as required via Subscription.request(n)

TcpServer

An abstraction for TCP based servers.
This provides an abstraction of a ConnectionHandler that receives and processes all client connections accepted by this server.

- Created /api module to provide APIs for all clients and servers for reactive-streams-io
- Created an empty /examples module to provide examples of those clients and servers.

The provided APIs are solely for starting a discussion.
@NiteshKant NiteshKant mentioned this pull request Feb 27, 2015
* to trigger the write. Each subscription will trigger a write of the same message. The subscriber to this
* {@link Publisher} will not complete unless {@link #flush()} is called on this connection.
*/
Publisher<Void> write(W msg);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that to write a value to the network I have to do three things:

  • Call TcpConnection.write(value)
  • Subscribe to the Publisher it returns and handle errors
  • Call TcpConnection.flush()

Why is the Publisher necessary, and not just write + flush? And why use a Publisher and not a simple callback? Does it actually publish something, or just an onComplete signal? When would I call write(msg) and not immediately subscribe to the returned Publisher?

This is probably following a pattern from elsewhere I'm not familiar with...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to enable composition. If it was just a callback then RS libraries would immediately have to convert the callback into an RS to compose, such as with flatMap in order to wait on a write or handle errors. It would be a Publisher<Void> since it is just the terminal onError/onComplete states that matter for a write.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand now: the returned Publisher describes the asynchronous signal that says the write+flush have completed. In other words, it's acting as a Future.

Is it standard practice in Java RS-based interfaces to use a 0-item or 1-item Publisher as a Future? Or just in standard interfaces which need to be usable by any RS implementation? It just looks overly complex, and possibly more expensive than a Future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, it's acting as a Future.

Yes but not quiet like a Future! Futures are always hot and can only be used once. Publisher here and Observable in RxJava are cold (in most cases) i.e. one needs to subscribe to them to start execution. Also, one can subscribe to them multiple times. In this case, multiple subscriptions would trigger multiple write+flush.

Is it standard practice in Java RS-based interfaces to use a 0-item or 1-item Publisher as a Future? Or just in standard interfaces which need to be usable by any RS implementation? It just looks overly complex, and possibly more expensive than a Future.

I generally prefer using a single async abstraction and hence use Observable everywhere. In case of interfacing with netty, I bridged a Subscriber and Future from netty.
It takes a bit of getting used to treating a scalar response as a stream of 1 but once you wrap your head around it, it has a benefit of consistency. FWIW, RxJava has this issue ReactiveX/RxJava#1594 to introduce Future/Task in RxJava.

@danarmak
Copy link

These interfaces don't seem to allow for multiplexing several RS streams over a single TCP connection. I think that's a desirable feature to have. There are probably use cases where one wants to subscribe to many (hundreds or thousands) of streams, each of which publishes events rarely. It would be inefficient to open a new TCP socket for each of them.

Also, a subscribe request should be able to specify the publisher it wants to subscribe to. Otherwise the server can only run one publisher per port. Beyond that, we could also add publisher discovery (i.e. listing publishers and retrieving their metadata), but I think that's overkill and should be left for a future version or an extension.

@benjchristensen
Copy link
Contributor

This work is probably better suited to a separate repo as it is going to distract from creating the RS.io network protocol itself. It's my fault for how I mixed the two in this repo. I've started to clarify here: #5 (comment)

@maniksurtani
Copy link

Hmm, I agree with @danarmak that being able to multiplex over the socket is desirable. Even leaving discovery out for now, I think the ability to multiplex would need to be baked into the protocol - and this is the right level of abstraction for it.

@NiteshKant
Copy link
Author

I think the ability to multiplex would need to be baked into the protocol - and this is the right level of abstraction for it.

As @benjchristensen pointed out there was a disconnect between the problem this PR is intending to discuss and the impression that issue #5 gave.
This PR intends to create abstractions for TCP client and server for any protocol layered on top of it. RS.io being one of them.

@benjchristensen
Copy link
Contributor

I'm closing this out in favor of discussing these items in a separate repo: https://github.com/reactive-streams/reactive-streams-net-jvm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants