Skip to content

Commit d476291

Browse files
mostroverkhovrobertroeser
authored andcommitted
Example of resumable file transfer (#631)
* resumable file transfer example Signed-off-by: Maksym Ostroverkhov <[email protected]> * formatter Signed-off-by: Maksym Ostroverkhov <[email protected]> * remove code Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent b89aa74 commit d476291

File tree

5 files changed

+316
-1
lines changed

5 files changed

+316
-1
lines changed

rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.netty.buffer.ByteBuf;
2020
import java.time.Duration;
2121

22-
class ClientResume {
22+
public class ClientResume {
2323
private final Duration sessionDuration;
2424
private final ByteBuf resumeToken;
2525

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package io.rsocket.examples.transport.tcp.resume;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.Unpooled;
5+
import io.rsocket.Payload;
6+
import java.io.*;
7+
import org.reactivestreams.Subscriber;
8+
import org.reactivestreams.Subscription;
9+
import reactor.core.publisher.Flux;
10+
import reactor.core.publisher.SynchronousSink;
11+
12+
class Files {
13+
14+
public static Flux<ByteBuf> fileSource(String fileName, int chunkSizeBytes) {
15+
return Flux.generate(
16+
() -> new FileState(fileName, chunkSizeBytes), FileState::consumeNext, FileState::dispose);
17+
}
18+
19+
public static Subscriber<Payload> fileSink(String fileName, int windowSize) {
20+
return new Subscriber<Payload>() {
21+
Subscription s;
22+
int requests = windowSize;
23+
OutputStream outputStream;
24+
int receivedBytes;
25+
int receivedCount;
26+
27+
@Override
28+
public void onSubscribe(Subscription s) {
29+
this.s = s;
30+
this.s.request(requests);
31+
}
32+
33+
@Override
34+
public void onNext(Payload payload) {
35+
ByteBuf data = payload.data();
36+
receivedBytes += data.readableBytes();
37+
receivedCount += 1;
38+
System.out.println(
39+
"Received file chunk: " + receivedCount + ". Total size: " + receivedBytes);
40+
if (outputStream == null) {
41+
outputStream = open(fileName);
42+
}
43+
write(outputStream, data);
44+
payload.release();
45+
46+
requests--;
47+
if (requests == windowSize / 2) {
48+
requests += windowSize;
49+
s.request(windowSize);
50+
}
51+
}
52+
53+
private void write(OutputStream outputStream, ByteBuf byteBuf) {
54+
try {
55+
byteBuf.readBytes(outputStream, byteBuf.readableBytes());
56+
} catch (IOException e) {
57+
throw new RuntimeException(e);
58+
}
59+
}
60+
61+
@Override
62+
public void onError(Throwable t) {
63+
close(outputStream);
64+
}
65+
66+
@Override
67+
public void onComplete() {
68+
close(outputStream);
69+
}
70+
71+
private OutputStream open(String filename) {
72+
try {
73+
/*do not buffer for demo purposes*/
74+
return new FileOutputStream(filename);
75+
} catch (FileNotFoundException e) {
76+
throw new RuntimeException(e);
77+
}
78+
}
79+
80+
private void close(OutputStream stream) {
81+
if (stream != null) {
82+
try {
83+
stream.close();
84+
} catch (IOException e) {
85+
}
86+
}
87+
}
88+
};
89+
}
90+
91+
private static class FileState {
92+
private final String fileName;
93+
private final int chunkSizeBytes;
94+
private BufferedInputStream inputStream;
95+
private byte[] chunkBytes;
96+
97+
public FileState(String fileName, int chunkSizeBytes) {
98+
this.fileName = fileName;
99+
this.chunkSizeBytes = chunkSizeBytes;
100+
}
101+
102+
public FileState consumeNext(SynchronousSink<ByteBuf> sink) {
103+
if (inputStream == null) {
104+
InputStream in = getClass().getClassLoader().getResourceAsStream(fileName);
105+
if (in == null) {
106+
sink.error(new FileNotFoundException(fileName));
107+
return this;
108+
}
109+
this.inputStream = new BufferedInputStream(in);
110+
this.chunkBytes = new byte[chunkSizeBytes];
111+
}
112+
try {
113+
int consumedBytes = inputStream.read(chunkBytes);
114+
if (consumedBytes == -1) {
115+
sink.complete();
116+
} else {
117+
sink.next(Unpooled.copiedBuffer(chunkBytes, 0, consumedBytes));
118+
}
119+
} catch (IOException e) {
120+
sink.error(e);
121+
}
122+
return this;
123+
}
124+
125+
public void dispose() {
126+
if (inputStream != null) {
127+
try {
128+
inputStream.close();
129+
} catch (IOException e) {
130+
}
131+
}
132+
}
133+
}
134+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package io.rsocket.examples.transport.tcp.resume;
2+
3+
import io.rsocket.AbstractRSocket;
4+
import io.rsocket.Payload;
5+
import io.rsocket.RSocket;
6+
import io.rsocket.RSocketFactory;
7+
import io.rsocket.resume.ClientResume;
8+
import io.rsocket.resume.PeriodicResumeStrategy;
9+
import io.rsocket.resume.ResumeStrategy;
10+
import io.rsocket.transport.netty.client.TcpClientTransport;
11+
import io.rsocket.transport.netty.server.CloseableChannel;
12+
import io.rsocket.transport.netty.server.TcpServerTransport;
13+
import io.rsocket.util.DefaultPayload;
14+
import java.time.Duration;
15+
import org.reactivestreams.Publisher;
16+
import reactor.core.publisher.Flux;
17+
import reactor.core.publisher.Mono;
18+
19+
public class ResumeFileTransfer {
20+
21+
public static void main(String[] args) {
22+
RequestCodec requestCodec = new RequestCodec();
23+
24+
CloseableChannel server =
25+
RSocketFactory.receive()
26+
.resume()
27+
.resumeSessionDuration(Duration.ofMinutes(5))
28+
.acceptor((setup, rSocket) -> Mono.just(new FileServer(requestCodec)))
29+
.transport(TcpServerTransport.create("localhost", 8000))
30+
.start()
31+
.block();
32+
33+
RSocket client =
34+
RSocketFactory.connect()
35+
.resume()
36+
.resumeStrategy(
37+
() -> new VerboseResumeStrategy(new PeriodicResumeStrategy(Duration.ofSeconds(1))))
38+
.resumeSessionDuration(Duration.ofMinutes(5))
39+
.transport(TcpClientTransport.create("localhost", 8001))
40+
.start()
41+
.block();
42+
43+
client
44+
.requestStream(requestCodec.encode(new Request(16, "lorem.txt")))
45+
.doFinally(s -> server.dispose())
46+
.subscribe(Files.fileSink("rsocket-examples/out/lorem_output.txt", 256));
47+
48+
server.onClose().block();
49+
}
50+
51+
private static class FileServer extends AbstractRSocket {
52+
private final RequestCodec requestCodec;
53+
54+
public FileServer(RequestCodec requestCodec) {
55+
this.requestCodec = requestCodec;
56+
}
57+
58+
@Override
59+
public Flux<Payload> requestStream(Payload payload) {
60+
Request request = requestCodec.decode(payload);
61+
payload.release();
62+
String fileName = request.getFileName();
63+
int chunkSize = request.getChunkSize();
64+
65+
Flux<Long> ticks = Flux.interval(Duration.ofMillis(500)).onBackpressureDrop();
66+
67+
return Files.fileSource(fileName, chunkSize)
68+
.map(DefaultPayload::create)
69+
.zipWith(ticks, (p, tick) -> p);
70+
}
71+
}
72+
73+
private static class VerboseResumeStrategy implements ResumeStrategy {
74+
private final ResumeStrategy resumeStrategy;
75+
76+
public VerboseResumeStrategy(ResumeStrategy resumeStrategy) {
77+
this.resumeStrategy = resumeStrategy;
78+
}
79+
80+
@Override
81+
public Publisher<?> apply(ClientResume clientResume, Throwable throwable) {
82+
return Flux.from(resumeStrategy.apply(clientResume, throwable))
83+
.doOnNext(v -> System.out.println("Disconnected. Trying to resume connection..."));
84+
}
85+
}
86+
87+
private static class RequestCodec {
88+
89+
public Payload encode(Request request) {
90+
String encoded = request.getChunkSize() + ":" + request.getFileName();
91+
return DefaultPayload.create(encoded);
92+
}
93+
94+
public Request decode(Payload payload) {
95+
String encoded = payload.getDataUtf8();
96+
String[] chunkSizeAndFileName = encoded.split(":");
97+
int chunkSize = Integer.parseInt(chunkSizeAndFileName[0]);
98+
String fileName = chunkSizeAndFileName[1];
99+
return new Request(chunkSize, fileName);
100+
}
101+
}
102+
103+
private static class Request {
104+
private final int chunkSize;
105+
private final String fileName;
106+
107+
public Request(int chunkSize, String fileName) {
108+
this.chunkSize = chunkSize;
109+
this.fileName = fileName;
110+
}
111+
112+
public int getChunkSize() {
113+
return chunkSize;
114+
}
115+
116+
public String getFileName() {
117+
return fileName;
118+
}
119+
}
120+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
1. Start socat. It is used for emulation of transport disconnects
2+
3+
`socat -d TCP-LISTEN:8001,fork,reuseaddr TCP:localhost:8000`
4+
5+
2. start `ResumeFileTransfer.main`
6+
7+
3. terminate/start socat periodically for session resumption
8+
9+
`ResumeFileTransfer` output is as follows
10+
11+
```
12+
Received file chunk: 7. Total size: 112
13+
Received file chunk: 8. Total size: 128
14+
Received file chunk: 9. Total size: 144
15+
Received file chunk: 10. Total size: 160
16+
Disconnected. Trying to resume connection...
17+
Disconnected. Trying to resume connection...
18+
Disconnected. Trying to resume connection...
19+
Disconnected. Trying to resume connection...
20+
Disconnected. Trying to resume connection...
21+
Received file chunk: 11. Total size: 176
22+
Received file chunk: 12. Total size: 192
23+
Received file chunk: 13. Total size: 208
24+
Received file chunk: 14. Total size: 224
25+
Received file chunk: 15. Total size: 240
26+
Received file chunk: 16. Total size: 256
27+
```
28+
29+
It transfers file from `resources/lorem.txt` to `build/out/lorem_output.txt` in chunks of 16 bytes every 500 millis
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
Alteration literature to or an sympathize mr imprudence. Of is ferrars subject as enjoyed or tedious cottage.
2+
Procuring as in resembled by in agreeable. Next long no gave mr eyes. Admiration advantages no he celebrated so pianoforte unreserved.
3+
Not its herself forming charmed amiable. Him why feebly expect future now.
4+
5+
Situation admitting promotion at or to perceived be. Mr acuteness we as estimable enjoyment up.
6+
An held late as felt know. Learn do allow solid to grave. Middleton suspicion age her attention.
7+
Chiefly several bed its wishing. Is so moments on chamber pressed to. Doubtful yet way properly answered humanity its desirous.
8+
Minuter believe service arrived civilly add all. Acuteness allowance an at eagerness favourite in extensive exquisite ye.
9+
10+
Unpleasant nor diminution excellence apartments imprudence the met new. Draw part them he an to he roof only.
11+
Music leave say doors him. Tore bred form if sigh case as do. Staying he no looking if do opinion.
12+
Sentiments way understood end partiality and his.
13+
14+
Ladyship it daughter securing procured or am moreover mr. Put sir she exercise vicinity cheerful wondered.
15+
Continual say suspicion provision you neglected sir curiosity unwilling. Simplicity end themselves increasing led day sympathize yet.
16+
General windows effects not are drawing man garrets. Common indeed garden you his ladies out yet. Preference imprudence contrasted to remarkably in on.
17+
Taken now you him trees tears any. Her object giving end sister except oppose.
18+
19+
No comfort do written conduct at prevent manners on. Celebrated contrasted discretion him sympathize her collecting occasional.
20+
Do answered bachelor occasion in of offended no concerns. Supply worthy warmth branch of no ye. Voice tried known to as my to.
21+
Though wished merits or be. Alone visit use these smart rooms ham. No waiting in on enjoyed placing it inquiry.
22+
23+
So insisted received is occasion advanced honoured. Among ready to which up. Attacks smiling and may out assured moments man nothing outward.
24+
Thrown any behind afford either the set depend one temper. Instrument melancholy in acceptance collecting frequently be if.
25+
Zealously now pronounce existence add you instantly say offending. Merry their far had widen was. Concerns no in expenses raillery formerly.
26+
27+
As am hastily invited settled at limited civilly fortune me. Really spring in extent an by. Judge but built gay party world.
28+
Of so am he remember although required. Bachelor unpacked be advanced at. Confined in declared marianne is vicinity.
29+
30+
In alteration insipidity impression by travelling reasonable up motionless. Of regard warmth by unable sudden garden ladies.
31+
No kept hung am size spot no. Likewise led and dissuade rejoiced welcomed husbands boy. Do listening on he suspected resembled.
32+
Water would still if to. Position boy required law moderate was may.

0 commit comments

Comments
 (0)