Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ subprojects {

dependencies {
compile "io.projectreactor:reactor-core:3.1.1.RELEASE"
compile "io.netty:netty-buffer:4.1.15.Final"
compile "io.netty:netty-buffer:4.1.16.Final"
compile "org.reactivestreams:reactive-streams:1.0.1"
compile "org.slf4j:slf4j-api:1.7.25"
compile "com.google.code.findbugs:jsr305:3.0.2"
Expand Down
53 changes: 31 additions & 22 deletions rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
import io.rsocket.Payload;

import javax.annotation.Nullable;
Expand All @@ -29,17 +30,19 @@
import java.nio.charset.Charset;

public final class ByteBufPayload extends AbstractReferenceCounted implements Payload {
private final ByteBuf data;
private final ByteBuf metadata;
private static final Recycler<ByteBufPayload> RECYCLER =
new Recycler<ByteBufPayload>() {
protected ByteBufPayload newObject(Handle<ByteBufPayload> handle) {
return new ByteBufPayload(handle);
}
};

private ByteBufPayload(ByteBuf data) {
this.data = data.asReadOnly();
this.metadata = null;
}
private final Recycler.Handle<ByteBufPayload> handle;
private ByteBuf data;
private ByteBuf metadata;

private ByteBufPayload(ByteBuf data, @Nullable ByteBuf metadata) {
this.data = data.asReadOnly();
this.metadata = metadata == null ? null : metadata.asReadOnly();
private ByteBufPayload(final Recycler.Handle<ByteBufPayload> handle) {
this.handle = handle;
}

@Override
Expand All @@ -49,12 +52,12 @@ public boolean hasMetadata() {

@Override
public ByteBuf sliceMetadata() {
return metadata == null ? Unpooled.EMPTY_BUFFER : metadata.duplicate();
return metadata == null ? Unpooled.EMPTY_BUFFER : metadata;
}

@Override
public ByteBuf sliceData() {
return data.duplicate();
return data;
}

@Override
Expand Down Expand Up @@ -90,9 +93,12 @@ public ByteBufPayload touch(Object hint) {
@Override
protected void deallocate() {
data.release();
data = null;
if (metadata != null) {
metadata.release();
metadata = null;
}
handle.recycle(this);
}

/**
Expand All @@ -102,7 +108,7 @@ protected void deallocate() {
* @return a payload.
*/
public static Payload create(String data) {
return new ByteBufPayload(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, data));
return create(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, data), null);
}

/**
Expand All @@ -114,48 +120,51 @@ public static Payload create(String data) {
* @return a payload.
*/
public static Payload create(String data, @Nullable String metadata) {
return new ByteBufPayload(
return create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, data),
metadata == null ? null : ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, metadata)
);
}

public static Payload create(CharSequence data, Charset dataCharset) {
return new ByteBufPayload(ByteBufUtil.encodeString(ByteBufAllocator.DEFAULT, CharBuffer.wrap(data), dataCharset));
return create(ByteBufUtil.encodeString(ByteBufAllocator.DEFAULT, CharBuffer.wrap(data), dataCharset), null);
}

public static Payload create(CharSequence data, Charset dataCharset, @Nullable CharSequence metadata, Charset metadataCharset) {
return new ByteBufPayload(
return create(
ByteBufUtil.encodeString(ByteBufAllocator.DEFAULT, CharBuffer.wrap(data), dataCharset),
metadata == null ? null : ByteBufUtil.encodeString(ByteBufAllocator.DEFAULT, CharBuffer.wrap(metadata), metadataCharset)
);
}

public static Payload create(byte[] data) {
return new ByteBufPayload(Unpooled.wrappedBuffer(data));
return create(Unpooled.wrappedBuffer(data), null);
}

public static Payload create(byte[] data, @Nullable byte[] metadata) {
return new ByteBufPayload(Unpooled.wrappedBuffer(data), metadata == null ? null : Unpooled.wrappedBuffer(metadata));
return create(Unpooled.wrappedBuffer(data), metadata == null ? null : Unpooled.wrappedBuffer(metadata));
}

public static Payload create(ByteBuffer data) {
return new ByteBufPayload(Unpooled.wrappedBuffer(data));
return create(Unpooled.wrappedBuffer(data), null);
}

public static Payload create(ByteBuffer data, @Nullable ByteBuffer metadata) {
return new ByteBufPayload(Unpooled.wrappedBuffer(data), metadata == null ? null : Unpooled.wrappedBuffer(metadata));
return create(Unpooled.wrappedBuffer(data), metadata == null ? null : Unpooled.wrappedBuffer(metadata));
}

public static Payload create(ByteBuf data) {
return new ByteBufPayload(data);
return create(data, null);
}

public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
return new ByteBufPayload(data, metadata);
ByteBufPayload payload = RECYCLER.get();
payload.data = data;
payload.metadata = metadata;
return payload;
}

public static Payload create(Payload payload) {
return new ByteBufPayload(payload.sliceData(), payload.hasMetadata() ? payload.sliceMetadata() : null);
return create(payload.sliceData(), payload.hasMetadata() ? payload.sliceMetadata() : null);
}
}
29 changes: 12 additions & 17 deletions rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,9 @@ public final class DefaultPayload implements Payload {
private final ByteBuffer data;
private final ByteBuffer metadata;

private DefaultPayload(ByteBuffer data) {
this.data = data.asReadOnlyBuffer();
this.metadata = null;
}

private DefaultPayload(ByteBuffer data, @Nullable ByteBuffer metadata) {
this.data = data.asReadOnlyBuffer();
this.metadata = metadata == null ? null : metadata.asReadOnlyBuffer();
this.data = data;
this.metadata = metadata;
}

@Override
Expand All @@ -62,12 +57,12 @@ public ByteBuf sliceData() {

@Override
public ByteBuffer getMetadata() {
return metadata == null ? DefaultPayload.EMPTY_BUFFER : metadata.duplicate();
return metadata == null ? DefaultPayload.EMPTY_BUFFER : metadata;
}

@Override
public ByteBuffer getData() {
return data.duplicate();
return data;
}

@Override
Expand Down Expand Up @@ -112,7 +107,7 @@ public boolean release(int decrement) {
* @return a payload.
*/
public static Payload create(CharSequence data) {
return new DefaultPayload(StandardCharsets.UTF_8.encode(CharBuffer.wrap(data)));
return create(StandardCharsets.UTF_8.encode(CharBuffer.wrap(data)), null);
}

/**
Expand All @@ -124,40 +119,40 @@ public static Payload create(CharSequence data) {
* @return a payload.
*/
public static Payload create(CharSequence data, @Nullable CharSequence metadata) {
return new DefaultPayload(
return create(
StandardCharsets.UTF_8.encode(CharBuffer.wrap(data)),
metadata == null ? null : StandardCharsets.UTF_8.encode(CharBuffer.wrap(metadata))
);
}

public static Payload create(CharSequence data, Charset dataCharset) {
return new DefaultPayload(dataCharset.encode(CharBuffer.wrap(data)));
return create(dataCharset.encode(CharBuffer.wrap(data)), null);
}

public static Payload create(CharSequence data, Charset dataCharset, @Nullable CharSequence metadata, Charset metadataCharset) {
return new DefaultPayload(
return create(
dataCharset.encode(CharBuffer.wrap(data)),
metadata == null ? null : metadataCharset.encode(CharBuffer.wrap(metadata))
);
}

public static Payload create(byte[] data) {
return new DefaultPayload(ByteBuffer.wrap(data));
return create(ByteBuffer.wrap(data), null);
}

public static Payload create(byte[] data, @Nullable byte[] metadata) {
return new DefaultPayload(ByteBuffer.wrap(data), metadata == null ? null : ByteBuffer.wrap(metadata));
return create(ByteBuffer.wrap(data), metadata == null ? null : ByteBuffer.wrap(metadata));
}

public static Payload create(ByteBuffer data) {
return new DefaultPayload(data);
return create(data, null);
}

public static Payload create(ByteBuffer data, @Nullable ByteBuffer metadata) {
return new DefaultPayload(data, metadata);
}

public static Payload create(Payload payload) {
return new DefaultPayload(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null);
return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package io.rsocket.util;

import static io.rsocket.util.DefaultPayload.create;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;

Expand All @@ -32,14 +31,6 @@ public void testReuse() {
assertDataAndMetadata(p, DATA_VAL, METADATA_VAL);
}

@Test
public void testReuseWithExternalMark() {
Payload p = DefaultPayload.create(DATA_VAL, METADATA_VAL);
assertDataAndMetadata(p, DATA_VAL, METADATA_VAL);
p.getData().position(2).mark();
assertDataAndMetadata(p, DATA_VAL, METADATA_VAL);
}

public void assertDataAndMetadata(Payload p, String dataVal, @Nullable String metadataVal) {
assertThat("Unexpected data.", p.getDataUtf8(), equalTo(dataVal));
if (metadataVal == null) {
Expand Down
2 changes: 1 addition & 1 deletion rsocket-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ dependencies {
compile "org.mockito:mockito-core:2.10.0"
compile "org.hamcrest:hamcrest-library:1.3"
compile "org.hdrhistogram:HdrHistogram:2.1.9"
compile "io.projectreactor:reactor-test:3.1.0.RELEASE"
compile "io.projectreactor:reactor-test:3.1.1.RELEASE"
}