Skip to content

Commit ad90694

Browse files
author
siyuan
committed
add make_ngx_resp_writer
* add make_ngx_resp_writer * add test of make_ngx_resp_writer * make_ngx_writer add an optional parameter
1 parent 191c880 commit ad90694

File tree

2 files changed

+148
-19
lines changed

2 files changed

+148
-19
lines changed

lib/pipe/writer.lua

Lines changed: 94 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
local httpclient = require("acid.httpclient")
22
local tableutil = require("acid.tableutil")
33
local strutil = require("acid.strutil")
4+
local rpc_logging = require("acid.rpc_logging")
5+
local acid_setutil = require("acid.setutil")
46

57
local _M = { _VERSION = '1.0' }
68

@@ -9,6 +11,84 @@ local to_str = strutil.to_str
911
local BLOCK_SIZE = 1024 * 1024
1012
local SOCKET_TIMEOUTS = {5 * 1000, 100 * 1000, 100 * 1000}
1113

14+
local INF = math.huge
15+
16+
17+
local function write_data_to_ngx(pobj, ident, opts)
18+
-- range = {start, end}
19+
-- range is a closed interval.
20+
local range = opts.range
21+
local pipe_log = opts.pipe_log
22+
23+
local recv_left, recv_right = 0, 0
24+
local from, to
25+
if range ~= nil then
26+
from = range['start'] + 1
27+
28+
if range['end'] ~= nil then
29+
to = range['end'] + 1
30+
else
31+
to = INF
32+
end
33+
34+
if from > to then
35+
return nil, 'InvalidRange', string.format(
36+
'start: %d is greater than end: %d', from, to)
37+
end
38+
end
39+
40+
while true do
41+
local data, err, err_msg
42+
if pipe_log ~= nil then
43+
rpc_logging.reset_start(pipe_log)
44+
45+
data, err, err_msg = pobj:read_pipe(ident)
46+
47+
rpc_logging.set_err(pipe_log, err)
48+
rpc_logging.incr_stat(pipe_log, 'downstream', 'sendbody', #(data or ''))
49+
else
50+
data, err, err_msg = pobj:read_pipe(ident)
51+
end
52+
53+
if err ~= nil then
54+
return nil, err, err_msg
55+
end
56+
57+
if data == '' then
58+
break
59+
end
60+
61+
recv_left = recv_right + 1
62+
recv_right = recv_right + #data
63+
64+
if from ~= nil then
65+
local intersection, err, err_msg =
66+
acid_setutil.intersect(from, to, recv_left, recv_right)
67+
if err ~= nil then
68+
return nil, err, err_msg
69+
end
70+
71+
local f, t = intersection.from, intersection.to
72+
if f == nil then
73+
data = ''
74+
else
75+
if t - f + 1 ~= #data then
76+
data = data:sub(f - recv_left + 1, t - recv_left + 1)
77+
end
78+
end
79+
end
80+
81+
ngx.print(data)
82+
local _, err = ngx.flush(true)
83+
if err then
84+
return nil, 'ClientAborted', err
85+
end
86+
end
87+
88+
return recv_right
89+
end
90+
91+
1292
function _M.connect_http(ips, port, verb, uri, opts)
1393
opts = opts or {}
1494

@@ -172,30 +252,25 @@ function _M.make_http_writer(ips, port, verb, uri, opts)
172252
end
173253
end
174254

175-
function _M.make_ngx_writer()
176-
return function(pobj, ident)
177-
local bytes = 0
255+
function _M.make_ngx_writer(opts)
256+
opts = opts or {}
178257

179-
while true do
180-
local data, err_code, err_msg = pobj:read_pipe(ident)
181-
if err_code ~= nil then
182-
return nil, err_code, err_msg
183-
end
258+
return function(pobj, ident)
259+
return write_data_to_ngx(pobj, ident, opts)
260+
end
261+
end
184262

185-
if data == '' then
186-
break
187-
end
188263

189-
bytes = bytes + #data
264+
function _M.make_ngx_resp_writer(status, headers, opts)
265+
opts = opts or {}
190266

191-
ngx.print(data)
192-
local _, err = ngx.flush(true)
193-
if err then
194-
return nil, 'ClientAborted', err
195-
end
196-
end
267+
ngx.status = status
268+
for k, v in pairs(headers) do
269+
ngx.header[k] = v
270+
end
197271

198-
return bytes
272+
return function(pobj, ident)
273+
return write_data_to_ngx(pobj, ident, opts)
199274
end
200275
end
201276

t/test_ngx_resp_writer.t

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# vim:set ft=lua ts=4 sw=4 et ft=perl:
2+
3+
use Test::Nginx::Socket "no_plan";
4+
5+
no_long_string();
6+
7+
run_tests();
8+
9+
__DATA__
10+
11+
=== TEST 1: test make_ngx_resp_writer
12+
--- http_config
13+
lua_check_client_abort on;
14+
15+
lua_package_path "./lib/?.lua;;";
16+
lua_package_cpath "./lib/?.so;;";
17+
--- config
18+
location /t {
19+
content_by_lua '
20+
local pipe_pipe = require("pipe.pipe")
21+
22+
local function is_running()
23+
return true
24+
end
25+
26+
local read_datas = {"1", "23", "4", "5", "678", "9", "0"}
27+
local readers = {pipe_pipe.reader.make_memery_reader(read_datas)}
28+
29+
local status = 200
30+
local headers = {["Content-Length"]=3}
31+
local opts = {range ={}}
32+
opts.range["start"] = 2
33+
opts.range["end"] = 4
34+
35+
local writers = {pipe_pipe.writer.make_ngx_resp_writer(status, headers, opts)}
36+
37+
local cpipe, err, err_msg = pipe_pipe:new(readers, writers)
38+
if err ~= nil then
39+
ngx.log(ngx.ERR, string.format("err: %s, err_msg: %s", err, err_msg))
40+
return
41+
end
42+
43+
local _, err, err_msg = cpipe:pipe(is_running)
44+
if err ~= nil then
45+
ngx.log(ngx.ERR, string.format("err: %s, err_msg: %s", err, err_msg))
46+
return
47+
end
48+
';
49+
}
50+
--- request
51+
GET /t
52+
--- response_body: 345
53+
--- no_error_log
54+
[error]

0 commit comments

Comments
 (0)