Skip to content

Commit 5703773

Browse files
committed
Bugfix: fixed the following bugs
- fix MQThreadPool memory leak when queue is full - fix the issue where the timeout parameter of HTTP asynchronous streaming ReadFullResponse interface did not take effect - fix the issue that stream length is 0 in HttpServiceProxy streaming request - correct type of timeout from int to uint32_t in http stream to avoid overflow during conversion - make tvar sampler remain valid before and after SamplerCollectorStop/Start
1 parent 0c7b592 commit 5703773

File tree

9 files changed

+117
-76
lines changed

9 files changed

+117
-76
lines changed

trpc/stream/http/async/stream.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ Future<> HttpAsyncStream::PushSendMessage(HttpStreamFramePtr&& msg) {
5656
return AsyncWrite(std::move(out));
5757
}
5858

59-
Future<http::HttpHeader> HttpAsyncStream::AsyncReadHeader(int timeout) {
59+
Future<http::HttpHeader> HttpAsyncStream::AsyncReadHeader(uint32_t timeout) {
6060
pending_header_.val = Promise<http::HttpHeader>();
6161

6262
auto ft = pending_header_.val.value().GetFuture();
@@ -80,7 +80,7 @@ Future<http::HttpHeader> HttpAsyncStream::AsyncReadHeader(int timeout) {
8080
return ft;
8181
}
8282

83-
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadChunk(int timeout) {
83+
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadChunk(uint32_t timeout) {
8484
// it can read only in chunked mode
8585
if (read_mode_ != DataMode::kChunked) {
8686
Status status{TRPC_STREAM_UNKNOWN_ERR, 0, "Can't read no chunk data"};
@@ -90,15 +90,15 @@ Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadChunk(int timeout) {
9090
return AsyncReadInner(ReadOperation::kReadChunk, 0, timeout);
9191
}
9292

93-
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadAtMost(uint64_t len, int timeout) {
93+
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadAtMost(uint64_t len, uint32_t timeout) {
9494
return AsyncReadInner(ReadOperation::kReadAtMost, len, timeout);
9595
}
9696

97-
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadExactly(uint64_t len, int timeout) {
97+
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadExactly(uint64_t len, uint32_t timeout) {
9898
return AsyncReadInner(ReadOperation::kReadExactly, len, timeout);
9999
}
100100

101-
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadInner(ReadOperation op, uint64_t len, int timeout) {
101+
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadInner(ReadOperation op, uint64_t len, uint32_t timeout) {
102102
if (read_mode_ == DataMode::kNoData) {
103103
// can not read data when content-length equal to 0
104104
return MakeReadyFuture<NoncontiguousBuffer>(NoncontiguousBuffer{});

trpc/stream/http/async/stream.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ class HttpAsyncStream : public HttpCommonStream {
3636

3737
/// @brief Reads the header asynchronously.
3838
/// @param timeout time to wait for the header to be ready
39-
Future<http::HttpHeader> AsyncReadHeader(int timeout = std::numeric_limits<int>::max());
39+
Future<http::HttpHeader> AsyncReadHeader(uint32_t timeout = std::numeric_limits<uint32_t>::max());
4040

4141
/// @brief Reads a chunk in chunked mode asynchronously, note that reading in non-chunked mode will fail
4242
/// @param timeout time to wait for the header to be ready
43-
Future<NoncontiguousBuffer> AsyncReadChunk(int timeout = std::numeric_limits<int>::max());
43+
Future<NoncontiguousBuffer> AsyncReadChunk(uint32_t timeout = std::numeric_limits<int>::max());
4444

4545
/// @brief Reads at most len data asynchronously.
4646
/// @param len max size to read
@@ -50,15 +50,15 @@ class HttpAsyncStream : public HttpCommonStream {
5050
/// An empty buffer means that the end has been read
5151
/// Usage scenario 1: Limits the maximum length of each read When the memory is limited.
5252
/// Usage scenario 2: Gets part of data in time and send it downstream on route server.
53-
Future<NoncontiguousBuffer> AsyncReadAtMost(uint64_t len, int timeout = std::numeric_limits<int>::max());
53+
Future<NoncontiguousBuffer> AsyncReadAtMost(uint64_t len, uint32_t timeout = std::numeric_limits<uint32_t>::max());
5454

5555
/// @brief Reads data with a fixed length. If eof is read, it will return as much data as there is in the network
5656
/// @param len size to read
5757
/// @param timeout time to wait for the header to be ready
5858
/// @note If the read buffer size is less than the required length, it means that eof has been read.
5959
/// Usage scenario 1: The requested data is compressed by a fixed size, and needs to be read and decompressed by
6060
/// a fixed size.
61-
Future<NoncontiguousBuffer> AsyncReadExactly(uint64_t len, int timeout = std::numeric_limits<int>::max());
61+
Future<NoncontiguousBuffer> AsyncReadExactly(uint64_t len, uint32_t timeout = std::numeric_limits<uint32_t>::max());
6262

6363
protected:
6464
template <class T>
@@ -98,7 +98,7 @@ class HttpAsyncStream : public HttpCommonStream {
9898

9999
/// @brief Creates a scheduled waiting task
100100
template <class T>
101-
void CreatePendingTimer(PendingVal<T>* pending, int timeout);
101+
void CreatePendingTimer(PendingVal<T>* pending, uint32_t timeout);
102102

103103
/// @brief Checks the pending state
104104
template <class T>
@@ -110,7 +110,7 @@ class HttpAsyncStream : public HttpCommonStream {
110110

111111
void NotifyPendingDataQueue();
112112

113-
Future<NoncontiguousBuffer> AsyncReadInner(ReadOperation op, uint64_t len, int timeout);
113+
Future<NoncontiguousBuffer> AsyncReadInner(ReadOperation op, uint64_t len, uint32_t timeout);
114114

115115
protected:
116116
/// @brief Used to store asynchronous data request
@@ -147,7 +147,7 @@ void HttpAsyncStream::PendingDone(PendingVal<T>* pending) {
147147
}
148148

149149
template <class T>
150-
void HttpAsyncStream::CreatePendingTimer(PendingVal<T>* pending, int timeout) {
150+
void HttpAsyncStream::CreatePendingTimer(PendingVal<T>* pending, uint32_t timeout) {
151151
TRPC_CHECK_EQ(pending->timer_id, iotimer::InvalidID);
152152
pending->timer_id = iotimer::Create(timeout, 0, [this, pending]() {
153153
if (!pending->val) {

trpc/stream/http/async/stream_reader_writer.cc

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ HttpAsyncStreamReader::HttpAsyncStreamReader(HttpAsyncStreamPtr stream) : stream
2222
TRPC_ASSERT(stream_ && "HttpAsyncStreamPtr can't be nullptr");
2323
}
2424

25-
Future<http::HttpHeader> HttpAsyncStreamReader::ReadHeader(int timeout) { return stream_->AsyncReadHeader(timeout); }
25+
Future<http::HttpHeader> HttpAsyncStreamReader::ReadHeader(uint32_t timeout) { return stream_->AsyncReadHeader(timeout); }
2626

27-
Future<NoncontiguousBuffer> HttpAsyncStreamReader::ReadChunk(int timeout) { return stream_->AsyncReadChunk(timeout); }
27+
Future<NoncontiguousBuffer> HttpAsyncStreamReader::ReadChunk(uint32_t timeout) { return stream_->AsyncReadChunk(timeout); }
2828

29-
Future<NoncontiguousBuffer> HttpAsyncStreamReader::ReadAtMost(uint64_t len, int timeout) {
29+
Future<NoncontiguousBuffer> HttpAsyncStreamReader::ReadAtMost(uint64_t len, uint32_t timeout) {
3030
return stream_->AsyncReadAtMost(len, timeout);
3131
}
3232

33-
Future<NoncontiguousBuffer> HttpAsyncStreamReader::ReadExactly(uint64_t len, int timeout) {
33+
Future<NoncontiguousBuffer> HttpAsyncStreamReader::ReadExactly(uint64_t len, uint32_t timeout) {
3434
return stream_->AsyncReadExactly(len, timeout);
3535
}
3636

@@ -87,15 +87,15 @@ HttpAsyncStreamReaderWriter::HttpAsyncStreamReaderWriter(HttpAsyncStreamPtr stre
8787
TRPC_ASSERT(stream_ && "HttpAsyncStreamPtr can't be nullptr");
8888
}
8989

90-
Future<http::HttpHeader> HttpAsyncStreamReaderWriter::ReadHeader(int timeout) { return reader_->ReadHeader(timeout); }
90+
Future<http::HttpHeader> HttpAsyncStreamReaderWriter::ReadHeader(uint32_t timeout) { return reader_->ReadHeader(timeout); }
9191

92-
Future<NoncontiguousBuffer> HttpAsyncStreamReaderWriter::ReadChunk(int timeout) { return reader_->ReadChunk(timeout); }
92+
Future<NoncontiguousBuffer> HttpAsyncStreamReaderWriter::ReadChunk(uint32_t timeout) { return reader_->ReadChunk(timeout); }
9393

94-
Future<NoncontiguousBuffer> HttpAsyncStreamReaderWriter::ReadAtMost(uint64_t len, int timeout) {
94+
Future<NoncontiguousBuffer> HttpAsyncStreamReaderWriter::ReadAtMost(uint64_t len, uint32_t timeout) {
9595
return reader_->ReadAtMost(len, timeout);
9696
}
9797

98-
Future<NoncontiguousBuffer> HttpAsyncStreamReaderWriter::ReadExactly(uint64_t len, int timeout) {
98+
Future<NoncontiguousBuffer> HttpAsyncStreamReaderWriter::ReadExactly(uint64_t len, uint32_t timeout) {
9999
return reader_->ReadExactly(len, timeout);
100100
}
101101

@@ -177,34 +177,34 @@ http::HttpRequestPtr ConstructHttpRequest(HttpRequestLine&& start_line, trpc::ht
177177
}
178178

179179
template <typename T>
180-
Future<http::HttpRequestPtr> ReadFullRequestImpl(T rw, int timeout) {
180+
Future<http::HttpRequestPtr> ReadFullRequestImpl(T rw, uint32_t timeout) {
181181
HttpRequestLine start_line = rw->GetRequestLine(); // reads request line
182182
http::PathParameters param = rw->GetParameters();
183183
// Reads header first, then read the body.
184-
return rw->ReadHeader(timeout).Then(
185-
[rw, start_line = std::move(start_line), param = std::move(param)](trpc::http::HttpHeader&& header) mutable {
186-
// Read full request body if it has any.
187-
bool has_data = false;
188-
uint64_t read_bytes = std::numeric_limits<uint64_t>::max();
189-
if (header.Get(http::kHeaderTransferEncoding) == http::kTransferEncodingChunked) {
190-
has_data = true;
191-
}
192-
if (const std::string& len = header.Get(http::kHeaderContentLength); !len.empty()) {
193-
has_data = true;
194-
// The |len| here is valid(a failure will occur if it has bad length).
195-
read_bytes = trpc::TryParse<uint64_t>(len).value();
196-
}
197-
if (has_data) {
198-
return rw->ReadExactly(read_bytes)
199-
.Then([start_line = std::move(start_line), param = std::move(param),
200-
header = std::move(header)](NoncontiguousBuffer&& data) mutable {
201-
return MakeReadyFuture<trpc::http::HttpRequestPtr>(
202-
ConstructHttpRequest(std::move(start_line), std::move(param), std::move(header), std::move(data)));
203-
});
204-
}
205-
return MakeReadyFuture<trpc::http::HttpRequestPtr>(
206-
ConstructHttpRequest(std::move(start_line), std::move(param), std::move(header), NoncontiguousBuffer{}));
207-
});
184+
return rw->ReadHeader(timeout).Then([rw, timeout, start_line = std::move(start_line),
185+
param = std::move(param)](trpc::http::HttpHeader&& header) mutable {
186+
// Read full request body if it has any.
187+
bool has_data = false;
188+
uint64_t read_bytes = std::numeric_limits<uint64_t>::max();
189+
if (header.Get(http::kHeaderTransferEncoding) == http::kTransferEncodingChunked) {
190+
has_data = true;
191+
}
192+
if (const std::string& len = header.Get(http::kHeaderContentLength); !len.empty()) {
193+
has_data = true;
194+
// The |len| here is valid(a failure will occur if it has bad length).
195+
read_bytes = trpc::TryParse<uint64_t>(len).value();
196+
}
197+
if (has_data) {
198+
return rw->ReadExactly(read_bytes, timeout)
199+
.Then([start_line = std::move(start_line), param = std::move(param),
200+
header = std::move(header)](NoncontiguousBuffer&& data) mutable {
201+
return MakeReadyFuture<trpc::http::HttpRequestPtr>(
202+
ConstructHttpRequest(std::move(start_line), std::move(param), std::move(header), std::move(data)));
203+
});
204+
}
205+
return MakeReadyFuture<trpc::http::HttpRequestPtr>(
206+
ConstructHttpRequest(std::move(start_line), std::move(param), std::move(header), NoncontiguousBuffer{}));
207+
});
208208
}
209209

210210
} // namespace
@@ -217,11 +217,11 @@ Future<> WriteFullResponse(HttpServerAsyncStreamWriterPtr rw, http::HttpResponse
217217
return WriteFullResponseImpl(rw, std::move(rsp));
218218
}
219219

220-
Future<http::HttpRequestPtr> ReadFullRequest(HttpServerAsyncStreamReaderWriterPtr rw, int timeout) {
220+
Future<http::HttpRequestPtr> ReadFullRequest(HttpServerAsyncStreamReaderWriterPtr rw, uint32_t timeout) {
221221
return ReadFullRequestImpl(rw, timeout);
222222
}
223223

224-
Future<http::HttpRequestPtr> ReadFullRequest(HttpServerAsyncStreamReaderPtr rw, int timeout) {
224+
Future<http::HttpRequestPtr> ReadFullRequest(HttpServerAsyncStreamReaderPtr rw, uint32_t timeout) {
225225
return ReadFullRequestImpl(rw, timeout);
226226
}
227227

@@ -283,9 +283,10 @@ trpc::http::HttpResponsePtr ConstructHttpResponse(HttpStatusLine&& staus_line, h
283283
}
284284

285285
template <typename T>
286-
Future<trpc::http::HttpResponsePtr> ReadFullResponseImpl(T rw, int timeout) {
287-
return rw->ReadStatusLine().Then([rw](HttpStatusLine&& start_line) {
288-
return rw->ReadHeader().Then([rw, start_line = std::move(start_line)](trpc::http::HttpHeader&& header) mutable {
286+
Future<trpc::http::HttpResponsePtr> ReadFullResponseImpl(T rw, uint32_t timeout) {
287+
return rw->ReadStatusLine(timeout).Then([rw, timeout](HttpStatusLine&& start_line) {
288+
return rw->ReadHeader(timeout).Then([rw, timeout,
289+
start_line = std::move(start_line)](trpc::http::HttpHeader&& header) mutable {
289290
// Reads content if it has.
290291
bool has_data = false;
291292
uint64_t read_bytes = std::numeric_limits<uint64_t>::max();
@@ -298,7 +299,7 @@ Future<trpc::http::HttpResponsePtr> ReadFullResponseImpl(T rw, int timeout) {
298299
read_bytes = trpc::TryParse<uint64_t>(len).value();
299300
}
300301
if (has_data) {
301-
return rw->ReadExactly(read_bytes)
302+
return rw->ReadExactly(read_bytes, timeout)
302303
.Then([start_line = std::move(start_line), header = std::move(header)](NoncontiguousBuffer&& data) mutable {
303304
return MakeReadyFuture<trpc::http::HttpResponsePtr>(
304305
ConstructHttpResponse(std::move(start_line), std::move(header), std::move(data)));
@@ -311,10 +312,10 @@ Future<trpc::http::HttpResponsePtr> ReadFullResponseImpl(T rw, int timeout) {
311312
}
312313
} // namespace
313314

314-
Future<trpc::http::HttpResponsePtr> ReadFullResponse(HttpClientAsyncStreamReaderWriterPtr rw, int timeout) {
315+
Future<trpc::http::HttpResponsePtr> ReadFullResponse(HttpClientAsyncStreamReaderWriterPtr rw, uint32_t timeout) {
315316
return ReadFullResponseImpl(rw, timeout);
316317
}
317-
Future<trpc::http::HttpResponsePtr> ReadFullResponse(HttpClientAsyncStreamReaderPtr rw, int timeout) {
318+
Future<trpc::http::HttpResponsePtr> ReadFullResponse(HttpClientAsyncStreamReaderPtr rw, uint32_t timeout) {
318319
return ReadFullResponseImpl(rw, timeout);
319320
}
320321

trpc/stream/http/async/stream_reader_writer.h

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ class HttpAsyncStreamReader : public RefCounted<HttpAsyncStreamReader> {
3232

3333
/// @brief Reads Header
3434
/// @param timeout time to wait for the header to be ready
35-
Future<http::HttpHeader> ReadHeader(int timeout = std::numeric_limits<int>::max());
35+
Future<http::HttpHeader> ReadHeader(uint32_t timeout = std::numeric_limits<uint32_t>::max());
3636

3737
/// @brief Reads a chunk in chunked mode, note that reading in non-chunked mode will fail
3838
/// @param timeout time to wait for the header to be ready
39-
Future<NoncontiguousBuffer> ReadChunk(int timeout = std::numeric_limits<int>::max());
39+
Future<NoncontiguousBuffer> ReadChunk(uint32_t timeout = std::numeric_limits<uint32_t>::max());
4040

4141
/// @brief Reads at most len data.
4242
/// @param len max size to read
@@ -46,15 +46,15 @@ class HttpAsyncStreamReader : public RefCounted<HttpAsyncStreamReader> {
4646
/// An empty buffer means that the end has been read
4747
/// Usage scenario 1: Limits the maximum length of each read When the memory is limited.
4848
/// Usage scenario 2: Gets part of data in time and send it downstream on route server.
49-
Future<NoncontiguousBuffer> ReadAtMost(uint64_t len, int timeout = std::numeric_limits<int>::max());
49+
Future<NoncontiguousBuffer> ReadAtMost(uint64_t len, uint32_t timeout = std::numeric_limits<uint32_t>::max());
5050

5151
/// @brief Reads data with a fixed length. If eof is read, it will return as much data as there is in the network
5252
/// @param len size to read
5353
/// @param timeout time to wait for the header to be ready
5454
/// @note If the read buffer size is less than the required length, it means that eof has been read.
5555
/// Usage scenario 1: The requested data is compressed by a fixed size, and needs to be read and decompressed by
5656
/// a fixed size.
57-
Future<NoncontiguousBuffer> ReadExactly(uint64_t len, int timeout = std::numeric_limits<int>::max());
57+
Future<NoncontiguousBuffer> ReadExactly(uint64_t len, uint32_t timeout = std::numeric_limits<uint32_t>::max());
5858

5959
private:
6060
HttpAsyncStreamPtr stream_{nullptr};
@@ -87,7 +87,7 @@ class HttpClientAsyncStreamReader : public HttpAsyncStreamReader {
8787
}
8888

8989
/// @brief Reads the status line of response.
90-
Future<HttpStatusLine> ReadStatusLine(int timeout = std::numeric_limits<int>::max()) {
90+
Future<HttpStatusLine> ReadStatusLine(uint32_t timeout = std::numeric_limits<uint32_t>::max()) {
9191
return stream_->ReadStatusLine(timeout);
9292
}
9393

@@ -160,11 +160,11 @@ class HttpAsyncStreamReaderWriter : public RefCounted<HttpAsyncStreamReaderWrite
160160

161161
/// @brief Reads Header
162162
/// @param timeout time to wait for the header to be ready
163-
Future<http::HttpHeader> ReadHeader(int timeout = std::numeric_limits<int>::max());
163+
Future<http::HttpHeader> ReadHeader(uint32_t timeout = std::numeric_limits<uint32_t>::max());
164164

165165
/// @brief Reads a chunk in chunked mode, note that reading in non-chunked mode will fail
166166
/// @param timeout time to wait for the header to be ready
167-
Future<NoncontiguousBuffer> ReadChunk(int timeout = std::numeric_limits<int>::max());
167+
Future<NoncontiguousBuffer> ReadChunk(uint32_t timeout = std::numeric_limits<uint32_t>::max());
168168

169169
/// @brief Reads at most len data.
170170
/// @param len max size to read
@@ -174,15 +174,15 @@ class HttpAsyncStreamReaderWriter : public RefCounted<HttpAsyncStreamReaderWrite
174174
/// An empty buffer means that the end has been read
175175
/// Usage scenario 1: Limits the maximum length of each read When the memory is limited.
176176
/// Usage scenario 2: Gets part of data in time and send it downstream on route server.
177-
Future<NoncontiguousBuffer> ReadAtMost(uint64_t len, int timeout = std::numeric_limits<int>::max());
177+
Future<NoncontiguousBuffer> ReadAtMost(uint64_t len, uint32_t timeout = std::numeric_limits<uint32_t>::max());
178178

179179
/// @brief Reads data with a fixed length. If eof is read, it will return as much data as there is in the network
180180
/// @param len size to read
181181
/// @param timeout time to wait for the header to be ready
182182
/// @note If the read buffer size is less than the required length, it means that eof has been read.
183183
/// Usage scenario 1: The requested data is compressed by a fixed size, and needs to be read and decompressed by
184184
/// a fixed size.
185-
Future<NoncontiguousBuffer> ReadExactly(uint64_t len, int timeout = std::numeric_limits<int>::max());
185+
Future<NoncontiguousBuffer> ReadExactly(uint64_t len, uint32_t timeout = std::numeric_limits<uint32_t>::max());
186186

187187
/// @brief Writes header or trailer
188188
Future<> WriteHeader(http::HttpHeader&& header);
@@ -247,7 +247,7 @@ class HttpClientAsyncStreamReaderWriter : public HttpAsyncStreamReaderWriter {
247247
Future<> WriteRequestLine(HttpRequestLine&& req_line) { return writer_->WriteRequestLine(std::move(req_line)); }
248248

249249
/// @brief Reads status line of response.
250-
Future<HttpStatusLine> ReadStatusLine(int timeout = std::numeric_limits<int>::max()) {
250+
Future<HttpStatusLine> ReadStatusLine(uint32_t timeout = std::numeric_limits<uint32_t>::max()) {
251251
return reader_->ReadStatusLine(timeout);
252252
}
253253

@@ -279,14 +279,14 @@ Future<> WriteFullResponse(HttpServerAsyncStreamWriterPtr rw, http::HttpResponse
279279

280280
/// @brief Reads a complete request from the stream.
281281
Future<http::HttpRequestPtr> ReadFullRequest(HttpServerAsyncStreamReaderWriterPtr rw,
282-
int timeout = std::numeric_limits<int>::max());
282+
uint32_t timeout = std::numeric_limits<uint32_t>::max());
283283
Future<http::HttpRequestPtr> ReadFullRequest(HttpServerAsyncStreamReaderPtr rw,
284-
int timeout = std::numeric_limits<int>::max());
284+
uint32_t timeout = std::numeric_limits<uint32_t>::max());
285285

286286
/// @brief Reads a complete response from the stream.
287287
Future<trpc::http::HttpResponsePtr> ReadFullResponse(HttpClientAsyncStreamReaderWriterPtr rw,
288-
int timeout = std::numeric_limits<int>::max());
288+
uint32_t timeout = std::numeric_limits<uint32_t>::max());
289289
Future<trpc::http::HttpResponsePtr> ReadFullResponse(HttpClientAsyncStreamReaderPtr rw,
290-
int timeout = std::numeric_limits<int>::max());
290+
uint32_t timeout = std::numeric_limits<uint32_t>::max());
291291

292292
} // namespace trpc::stream

trpc/stream/http/http_client_stream_handler.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ StreamReaderWriterProviderPtr HttpClientStreamHandler::CreateStream(StreamOption
2525

2626
int HttpClientStreamHandler::SendMessage(const std::any& msg, NoncontiguousBuffer&& send_data) {
2727
IoMessage io_message;
28+
io_message.context_ext = std::any_cast<const ClientContextPtr>(msg)->GetCurrentContextExt();
2829
io_message.buffer = std::move(send_data);
2930
io_message.msg = msg;
3031
return options_.send(std::move(io_message));

0 commit comments

Comments
 (0)