Skip to content

Commit de9101d

Browse files
committed
Implement ChunkedStreamSinkConduit fast-path for multi-buffer writes
1 parent fe06413 commit de9101d

File tree

1 file changed

+105
-9
lines changed

1 file changed

+105
-9
lines changed

core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java

Lines changed: 105 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.undertow.util.HeaderValues;
3636
import io.undertow.util.Headers;
3737
import io.undertow.util.ImmediatePooledByteBuffer;
38+
import org.xnio.Buffers;
3839
import org.xnio.IoUtils;
3940
import io.undertow.connector.ByteBufferPool;
4041
import io.undertow.connector.PooledByteBuffer;
@@ -128,6 +129,108 @@ public int write(final ByteBuffer src) throws IOException {
128129
return doWrite(src);
129130
}
130131

132+
long doWrite(final ByteBuffer[] srcs, int offset, int length) throws IOException {
133+
if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
134+
throw new ClosedChannelException();
135+
}
136+
long totalRemaining = Buffers.remaining(srcs, offset, length);
137+
if(totalRemaining == 0) {
138+
return 0;
139+
}
140+
if (totalRemaining > Integer.MAX_VALUE) {
141+
// Fall back to one buffer at a time if the total srcs remaining exceeds integer max-value
142+
// This should be very rare.
143+
for (int i = offset; i < offset + length; i++) {
144+
ByteBuffer buf = srcs[i];
145+
if (buf.hasRemaining()) {
146+
return write(buf);
147+
}
148+
}
149+
return 0;
150+
}
151+
int remaining = (int) totalRemaining;
152+
this.state |= FLAG_FIRST_DATA_WRITTEN;
153+
int oldLimit = srcs[length - 1].limit();
154+
boolean dataRemaining = false; //set to true if there is data in src that still needs to be written out
155+
if (chunkleft == 0 && !chunkingSepBuffer.hasRemaining()) {
156+
chunkingBuffer.clear();
157+
putIntAsHexString(chunkingBuffer, remaining);
158+
chunkingBuffer.put(CRLF);
159+
chunkingBuffer.flip();
160+
chunkingSepBuffer.clear();
161+
chunkingSepBuffer.put(CRLF);
162+
chunkingSepBuffer.flip();
163+
state |= FLAG_WRITTEN_FIRST_CHUNK;
164+
chunkleft = remaining;
165+
} else {
166+
int maxRemaining = chunkleft;
167+
for (int i = 0; i < length; i++) {
168+
ByteBuffer buf = srcs[offset + i];
169+
int bufRemaining = buf.remaining();
170+
if (bufRemaining >= maxRemaining) {
171+
length = i + 1;
172+
oldLimit = buf.limit();
173+
dataRemaining = true;
174+
buf.limit(buf.position() + maxRemaining);
175+
break;
176+
}
177+
maxRemaining -= bufRemaining;
178+
}
179+
}
180+
try {
181+
int chunkingSize = chunkingBuffer.remaining();
182+
int chunkingSepSize = chunkingSepBuffer.remaining();
183+
if (chunkingSize > 0 || chunkingSepSize > 0 || lastChunkBuffer != null) {
184+
int originalRemaining = (int) Buffers.remaining(srcs, offset, length);
185+
long result;
186+
if (lastChunkBuffer == null || dataRemaining) {
187+
// chunkingBuffer
188+
// srcs (taking into account offset+length)
189+
// chunkingSepBuffer
190+
final ByteBuffer[] buf = new ByteBuffer[2 + length];
191+
buf[0] = chunkingBuffer;
192+
System.arraycopy(srcs, offset , buf, 1, length);
193+
buf[length + 1] = chunkingSepBuffer;
194+
result = next.write(buf, 0, buf.length);
195+
} else {
196+
// chunkingBuffer
197+
// srcs (taking into account offset+length)
198+
// lastChunkBuffer
199+
final ByteBuffer[] buf = new ByteBuffer[2 + length];
200+
buf[0] = chunkingBuffer;
201+
System.arraycopy(srcs, offset , buf, 1, length);
202+
buf[length + 1] = lastChunkBuffer.getBuffer();
203+
if (anyAreSet(state, CONF_FLAG_PASS_CLOSE)) {
204+
result = next.writeFinal(buf, 0, buf.length);
205+
} else {
206+
result = next.write(buf, 0, buf.length);
207+
}
208+
// TODO(ckozak): Buffers.hasRemaining fast-path?
209+
if (Buffers.remaining(srcs, offset, length) == 0) {
210+
state |= FLAG_WRITES_SHUTDOWN;
211+
}
212+
if (!lastChunkBuffer.getBuffer().hasRemaining()) {
213+
state |= FLAG_NEXT_SHUTDOWN;
214+
lastChunkBuffer.close();
215+
}
216+
}
217+
int srcWritten = originalRemaining - (int) Buffers.remaining(srcs, offset, length);
218+
chunkleft -= srcWritten;
219+
if (result < chunkingSize) {
220+
return 0;
221+
} else {
222+
return srcWritten;
223+
}
224+
} else {
225+
long result = next.write(srcs, offset, length);
226+
chunkleft -= result;
227+
return result;
228+
229+
}
230+
} finally {
231+
srcs[length - 1].limit(oldLimit);
232+
}
233+
}
131234

132235
int doWrite(final ByteBuffer src) throws IOException {
133236
if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
@@ -195,7 +298,6 @@ int doWrite(final ByteBuffer src) throws IOException {
195298
} finally {
196299
src.limit(oldLimit);
197300
}
198-
199301
}
200302

201303
@Override
@@ -214,13 +316,7 @@ public void truncateWrites() throws IOException {
214316

215317
@Override
216318
public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
217-
for (int i = 0; i < length; i++) {
218-
ByteBuffer srcBuffer = srcs[offset + i];
219-
if (srcBuffer.hasRemaining()) {
220-
return write(srcBuffer);
221-
}
222-
}
223-
return 0;
319+
return doWrite(srcs, offset, length);
224320
}
225321

226322
@Override
@@ -364,7 +460,7 @@ private void createLastChunk(final boolean writeFinal) throws UnsupportedEncodin
364460
lastChunkBuffer.put(CRLF);
365461
}
366462
//horrible hack
367-
//there is a situation where we can get a buffer leak here if the connection is terminated abnormaly
463+
//there is a situation where we can get a buffer leak here if the connection is terminated abnormally
368464
//this should be fixed once this channel has its lifecycle tied to the connection, same as fixed length
369465
lastChunkBuffer.flip();
370466
ByteBuffer data = ByteBuffer.allocate(lastChunkBuffer.remaining());

0 commit comments

Comments
 (0)