diff --git a/lib/pipe/filter.lua b/lib/pipe/filter.lua index c700cce..edcd6c8 100644 --- a/lib/pipe/filter.lua +++ b/lib/pipe/filter.lua @@ -49,5 +49,59 @@ function _M.make_read_timeout_filter(r_idx) end end +function _M.make_read_max_size_filter(max_size, r_idx) + local size = 0 + + return function(rbufs, n_rd, wbufs, n_wrt, pipe_rst) + size = size + #(rbufs[r_idx] or '') + if size > max_size then + return nil, 'EntityTooLarge', + string.format('read size %s large than %s', size, max_size) + end + end +end + +function _M.make_kill_low_write_speed_filter(pobj, assert_func, quorum) + local all_stat = pobj:get_stat() + + return function(rbufs, n_rd, wbufs, n_wrt, pipe_rst) + local ok_stat, n_ok = {}, 0 + for idx, wrt_rst in pairs(pipe_rst.write_result) do + if wrt_rst.err == nil then + local ident = pobj.wrt_cos[idx].ident + local id_stat = all_stat[ident] or {} + + if id_stat.write_time ~= nil and id_stat.write_size ~= nil then + ok_stat[ident] = { + write_size = id_stat.write_size, + write_time = id_stat.write_time, + } + n_ok = n_ok + 1 + end + end + end + + if n_ok <= quorum then + return nil, nil, nil + end + + for ident, st in pairs(ok_stat) do + local cur_speed = st.write_size/(math.max(st.write_time * 1000, 1)/1000) + + if assert_func(ok_stat, ident, st, cur_speed) then + local err = { + err_code = "WriteSlow", + err_msg = to_str(ident, " coroutine write slow, speed:", + strutil.placeholder(cur_speed/1024, '-', '%.3f'), "kb/s"), + } + + pobj.wrt_cos[ident].err = err + ngx.log(ngx.ERR, to_str("slow coroutine:", pobj.wrt_cos[ident], ", error:", err)) + + break + end + end + end +end return _M diff --git a/lib/pipe/httplib.lua b/lib/pipe/httplib.lua new file mode 100644 index 0000000..c4656e6 --- /dev/null +++ b/lib/pipe/httplib.lua @@ -0,0 +1,181 @@ +local httpclient = require("acid.httpclient") +local tableutil = require("acid.tableutil") +local strutil = require("acid.strutil") + +local resty_sha1 = require( "resty.sha1" ) +local resty_md5 = require( "resty.md5" ) +local resty_string = require( "resty.string" ) + +local _M = { _VERSION = '1.0' } + +local to_str = strutil.to_str + +local BLOCK_SIZE = 1024 * 1024 +local SOCKET_TIMEOUTS = {5 * 1000, 100 * 1000, 100 * 1000} + +function _M.connect_http(ips, port, verb, uri, opts) + opts = opts or {} + + local try_times = math.max(opts.try_times or 1, 1) + + local http, _, err_code, err_msg + + for _, ip in ipairs(ips) do + local headers = tableutil.dup(opts.headers or {}, true) + headers.Host = headers.Host or ip + + local req = { + ip = ip, + port = port, + uri = uri, + verb = verb, + headers = headers, + } + + if opts.signature_cb ~= nil then + req = opts.signature_cb(req) + end + + http = httpclient:new(ip, port, opts.timeouts or SOCKET_TIMEOUTS, opts.http_opts) + + local h_opts = {method=req.verb, headers=req.headers} + for i=1, try_times, 1 do + _, err_code, err_msg = http:send_request(req.uri, h_opts) + if err_code == nil then + return http + end + end + end + + if err_code ~= nil then + err_msg = to_str(err_code, ':', err_msg) + err_code = 'ConnectError' + end + + return nil, err_code, err_msg +end + +function _M.get_http_response(http, opts) + opts = opts or {} + + local _, err_code, err_msg = http:finish_request() + if err_code ~= nil then + return nil, err_code, err_msg + end + + if opts.success_status ~= nil and opts.success_status ~= http.status then + return nil, 'InvalidHttpStatus', to_str('response http status:', http.status) + end + + local resp = { + status = http.status, + headers = http.ori_headers, + } + + if opts.read_body == false then + return resp + end + + local body = {} + + while true do + local data, err_code, err_msg = http:read_body(BLOCK_SIZE) + if err_code ~= nil then + return resp, err_code, err_msg + end + + if data == '' then + break + end + + table.insert(body, data) + end + resp.body = table.concat(body) + + return resp +end + +function _M.loop_http_read(pobj, ident, http, block_size, opts) + opts = opts or {} + + local rst = { + size = 0, + md5 = nil, + sha1 = nil, + } + + local md5_alg + if opts.calc_md5 == true then + md5_alg = resty_md5:new() + end + + local sha1_alg + if opts.calc_sha1 == true then + sha1_alg = resty_sha1:new() + end + + while true do + local data, err_code, err_msg = + http:read_body(block_size or BLOCK_SIZE) + if err_code ~= nil then + return nil, err_code, err_msg + end + + local _, err_code, err_msg = pobj:write_pipe(ident, data) + if err_code ~= nil then + return nil, err_code, err_msg + end + + if opts.calc_md5 == true then + md5_alg:update(data) + end + + if opts.calc_sha1 == true then + sha1_alg:update(data) + end + + rst.size = rst.size + #data + + if data == '' then + if opts.calc_md5 == true then + rst.md5 = resty_string.to_hex(md5_alg:final()) + end + + if opts.calc_sha1 == true then + rst.sha1 = resty_string.to_hex(sha1_alg:final()) + end + + break + end + end + + return rst +end + +function _M.loop_http_write(pobj, ident, http) + local bytes = 0 + + while true do + local data, err_code, err_msg = pobj:read_pipe(ident) + if err_code ~= nil then + return nil, err_code, err_msg + end + + if data == '' then + break + end + + local now = ngx.now() + local _, err_code, err_msg = http:send_body(data) + + ngx.update_time() + pobj:incr_stat(ident, "write_size", #data) + pobj:incr_stat(ident, "write_time", ngx.now()-now) + + bytes = bytes + #data + end + + return bytes +end + +return _M diff --git a/lib/pipe/pipe.lua b/lib/pipe/pipe.lua index 77b56a4..ad62acd 100644 --- a/lib/pipe/pipe.lua +++ b/lib/pipe/pipe.lua @@ -13,8 +13,8 @@ _M.writer = pipe_writer _M.filter = pipe_filter local to_str = strutil.to_str -local READ_TIMEOUT = 300 --seconds -local WRITE_TIMEOUT = 300 --seconds +local READ_TIMEOUT = 600 * 1000 --ms +local WRITE_TIMEOUT = 600 * 1000 --ms local function wrap_co_func(co, ...) local ok, rst, err_code, err_msg = pcall(co.func, ...) @@ -30,7 +30,7 @@ local function wrap_co_func(co, ...) err_code, err_msg = 'CoroutineError', rst end co.err = {err_code = err_code, err_msg = err_msg} - ngx.log(ngx.ERR, to_str(co.rd_or_wrt, " coroutine exit with error:", co.err)) + ngx.log(ngx.ERR, to_str(co.rd_or_wrt, ' ', co.ident, " coroutine exit with error:", co.err)) end co.is_dead = true @@ -175,12 +175,15 @@ local function async_wait_co_sema(self, cos, sema, quorum, timeout, err_code) while ngx.now() <= dead_time do local n_ok = 0 + local n_active = 0 for _, co in ipairs(cos) do if co.is_dead then if co.err == nil then n_ok = n_ok + 1 end + else + n_active = n_active + 1 end end @@ -188,6 +191,10 @@ local function async_wait_co_sema(self, cos, sema, quorum, timeout, err_code) return end + if n_active + n_ok < quorum then + break + end + ngx.sleep(0.001) end @@ -246,13 +253,34 @@ function _M.new(_, rds, wrts, filters, rd_timeout, wrt_timeout) wrt_filters = filters.wrt_filters or {pipe_filter.make_write_quorum_filter(#wrts)}, - rd_timeout = rd_timeout or READ_TIMEOUT, - wrt_timeout = wrt_timeout or WRITE_TIMEOUT, + rd_timeout = (rd_timeout or READ_TIMEOUT)/1000, + wrt_timeout = (wrt_timeout or WRITE_TIMEOUT)/1000, + stat = {}, } return setmetatable(obj, mt) end +function _M.set_stat(self, ident, key, val) + self.stat[ident] = self.stat[ident] or {} + self.stat[ident][key] = val + + return val +end + +function _M.incr_stat(self, ident, key, val) + self.stat[ident] = self.stat[ident] or {} + + local prev = self.stat[ident][key] or 0 + self.stat[ident][key] = prev + val + + return self.stat[ident][key] +end + +function _M.get_stat(self) + return self.stat +end + function _M.write_pipe(pobj, ident, buf) local rd_co = pobj.rd_cos[ident] @@ -360,4 +388,20 @@ function _M.pipe(self, is_running, quorum_return) return get_pipe_result(self) end +function _M.add_read_filter(self, flt) + if flt == nil then + return + end + + table.insert(self.rd_filters, flt) +end + +function _M.add_write_filter(self, flt) + if flt == nil then + return + end + + table.insert(self.wrt_filters, flt) +end + return _M diff --git a/lib/pipe/reader.lua b/lib/pipe/reader.lua index cfdad1e..e69916b 100644 --- a/lib/pipe/reader.lua +++ b/lib/pipe/reader.lua @@ -1,13 +1,9 @@ local tableutil = require("acid.tableutil") -local strutil = require("acid.strutil") -local httpclient = require("acid.httpclient") - -local to_str = strutil.to_str +local httplib = require("pipe.httplib") local _M = { _VERSION = '1.0' } local BLOCK_SIZE = 1024 * 1024 -local SOCKET_TIMEOUTS = {5 * 1000, 100 * 1000, 100 * 1000} local err_socket = { [ "default" ] = "InvalidRequest", @@ -21,72 +17,40 @@ local function socket_err_code( err, default ) return err_socket[err] or default end -function _M.make_http_reader(ips, port, verb, uri, opts) - opts = opts or {} - - local ret = { - size = 0, - time = 0, - } - - return function(pobj, ident) - local http, _, err_code, err_msg - - for _, ip in ipairs(ips) do - local headers = tableutil.dup(opts.headers or {}, true) - headers.Host = headers.Host or ip - - local req = { - ip = ip, - port = port, - uri = uri, - verb = verb, - headers = headers, - } - - if opts.signature_cb ~= nil then - req = opts.signature_cb(req) - end +function _M.connect_http(ips, port, verb, uri, opts) + return httplib.connect_http(ips, port, verb, uri, opts) +end - http = httpclient:new(ip, port, opts.timeouts or SOCKET_TIMEOUTS) +function _M.get_http_response(http, opts) + return httplib.get_http_response(http, opts) +end - local h_opts = {method=req.verb, headers=req.headers} - for i=1, 3, 1 do - _, err_code, err_msg = http:request(req.uri, h_opts) - if err_code == nil then - break - end - end +function _M.loop_http_read(pobj, ident, http, block_size, opts) + return httplib.loop_http_read(pobj, ident, http, block_size, opts) +end - if err_code ~= nil then - return nil, err_code, err_msg - end - end +function _M.make_connected_http_reader(http, block_size, opts) + return function(pobj, ident) + return _M.loop_http_read(pobj, ident, http, block_size, opts) + end +end - if opts.success_status ~= nil and opts.success_status ~= http.status then - return nil, 'InvalidHttpStatus', to_str('response http status:', http.status) +function _M.make_http_reader(ips, port, verb, uri, opts) + return function(pobj, ident) + local http , err_code, err_msg = _M.connect_http(ips, port, verb, uri, opts) + if err_code ~= nil then + return nil, err_code, err_msg end - while true do - local t0 = ngx.now() - local buf, err_code, err_msg = - http:read_body(opts.block_size or BLOCK_SIZE) - ret.time = ret.time + (ngx.now() - t0) - if err_code ~= nil then - return nil, err_code, err_msg - end - - local rst, err_code, err_msg = pobj:write_pipe(ident, buf) - if err_code ~= nil then - return nil, err_code, err_msg - end - - ret.size = ret.size + #buf + opts = tableutil.dup(opts, true) + opts.read_body = false - if buf == '' then - break - end + local _, err_code, err_msg = _M.get_http_response(http, opts) + if err_code ~= nil then + return nil, err_code, err_msg end + + return _M.loop_http_read(pobj, ident, http, opts.block_size, opts) end end diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index 84d474a..78fc518 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -1,54 +1,62 @@ -local httpclient = require("acid.httpclient") -local tableutil = require("acid.tableutil") +local httplib = require("pipe.httplib") local strutil = require("acid.strutil") +local tableutil = require("acid.tableutil") local rpc_logging = require("acid.rpc_logging") local acid_setutil = require("acid.setutil") +local s3_client = require('resty.aws_s3.client') +local hashlib = require("acid.hashlib") +local resty_string = require("resty.string") +local aws_chunk_writer = require("resty.aws_chunk.writer") local _M = { _VERSION = '1.0' } local to_str = strutil.to_str - -local BLOCK_SIZE = 1024 * 1024 -local SOCKET_TIMEOUTS = {5 * 1000, 100 * 1000, 100 * 1000} - local INF = math.huge - local function write_data_to_ngx(pobj, ident, opts) - -- range = {start, end} - -- range is a closed interval. + opts = opts or {} + + -- range = {from, to} is rfc2612 Range header, + -- a closed interval, starts with index 0 local range = opts.range - local pipe_log = opts.pipe_log + + local log = rpc_logging.new_entry('write_client') + rpc_logging.add_log(log) + + local ret = { + size = 0, + } local recv_left, recv_right = 0, 0 local from, to if range ~= nil then - from = range['start'] + 1 + from = range['from'] + 1 - if range['end'] ~= nil then - to = range['end'] + 1 + if range['to'] ~= nil then + to = range['to'] + 1 else to = INF end if from > to then return nil, 'InvalidRange', string.format( - 'start: %d is greater than end: %d', from, to) + 'from: %d is greater than to: %d', from, to) end end + local alg_sha1 = nil + + if opts.body_sha1 ~= nil then + alg_sha1 = hashlib:sha1() + end + while true do - local data, err, err_msg - if pipe_log ~= nil then - rpc_logging.reset_start(pipe_log) + rpc_logging.reset_start(log) - data, err, err_msg = pobj:read_pipe(ident) + local data, err, err_msg = pobj:read_pipe(ident) - rpc_logging.set_err(pipe_log, err) - rpc_logging.incr_stat(pipe_log, 'downstream', 'sendbody', #(data or '')) - else - data, err, err_msg = pobj:read_pipe(ident) - end + rpc_logging.set_err(log, err) + rpc_logging.incr_stat(log, 'upstream', 'recvbody', #(data or '')) if err ~= nil then return nil, err, err_msg @@ -78,116 +86,46 @@ local function write_data_to_ngx(pobj, ident, opts) end end + ret.size = ret.size + #data + + if alg_sha1 ~= nil then + alg_sha1:update(data) + end + + if ret.size == opts.total_size then + if alg_sha1 ~= nil then + local calc_sha1 = resty_string.to_hex(alg_sha1:final()) + if calc_sha1 ~= opts.body_sha1 then + return nil, "Sha1Notmatched", to_str("expect:", opts.body_sha1, ", actual:", calc_sha1) + end + end + end + + rpc_logging.reset_start(log) + ngx.print(data) local _, err = ngx.flush(true) + + rpc_logging.set_err(log, err) + rpc_logging.incr_stat(log, 'downstream', 'sendbody', #(data or '')) if err then return nil, 'ClientAborted', err end end - return recv_right + return ret end - function _M.connect_http(ips, port, verb, uri, opts) - opts = opts or {} - - local try_times = math.max(opts.try_times or 1, 1) - - local http, _, err_code, err_msg - - for _, ip in ipairs(ips) do - local headers = tableutil.dup(opts.headers or {}, true) - headers.Host = headers.Host or ip - - local req = { - ip = ip, - port = port, - uri = uri, - verb = verb, - headers = headers, - } - - if opts.signature_cb ~= nil then - req = opts.signature_cb(req) - end - - http = httpclient:new(ip, port, opts.timeouts or SOCKET_TIMEOUTS) - - local h_opts = {method=req.verb, headers=req.headers} - for i=1, try_times, 1 do - _, err_code, err_msg = http:send_request(req.uri, h_opts) - if err_code == nil then - return http - end - end - end - - if err_code ~= nil then - err_msg = to_str(err_code, ':', err_msg) - err_code = 'ConnectError' - end - - return nil, err_code, err_msg + return httplib.connect_http(ips, port, verb, uri, opts) end function _M.loop_http_write(pobj, ident, http) - local bytes = 0 - - while true do - local data, err_code, err_msg = pobj:read_pipe(ident) - if err_code ~= nil then - return nil, err_code, err_msg - end - - if data == '' then - break - end - - local _, err_code, err_msg = http:send_body(data) - if err_code ~= nil then - return nil, err_code, err_msg - end - - bytes = bytes + #data - end - - return bytes + return httplib.loop_http_write(pobj, ident, http) end function _M.get_http_response(http, opts) - opts = opts or {} - - local _, err_code, err_msg = http:finish_request() - if err_code ~= nil then - return nil, err_code, err_msg - end - - if opts.success_status ~= nil and opts.success_status ~= http.status then - return nil, 'InvalidHttpStatus', to_str('response http status:', http.status) - end - - local resp = { - status = http.status, - headers = http.headers, - } - local body = {} - - while true do - local data, err_code, err_msg = http:read_body(BLOCK_SIZE) - if err_code ~= nil then - return resp, err_code, err_msg - end - - if data == '' then - break - end - - table.insert(body, data) - end - resp.body = table.concat(body) - - return resp + return httplib.get_http_response(http, opts) end @@ -253,8 +191,6 @@ function _M.make_http_writer(ips, port, verb, uri, opts) end function _M.make_ngx_writer(opts) - opts = opts or {} - return function(pobj, ident) return write_data_to_ngx(pobj, ident, opts) end @@ -262,8 +198,6 @@ end function _M.make_ngx_resp_writer(status, headers, opts) - opts = opts or {} - ngx.status = status for k, v in pairs(headers) do ngx.header[k] = v @@ -309,4 +243,104 @@ function _M.make_buffer_writer(buffer, do_concat) end end +function _M.make_quorum_http_writers(dests, writer_opts, quorum) + local conn_threads = {} + + for _, dest in ipairs(dests) do + local th = ngx.thread.spawn(_M.connect_http, + dest.ips, dest.port, dest.method, dest.uri, writer_opts) + table.insert(conn_threads, th) + end + + local writers = {} + local n_ok = 0 + for _, th in ipairs(conn_threads) do + local wrt = {} + + local ok, http, err_code, err_msg = ngx.thread.wait(th) + if ok and err_code == nil then + n_ok = n_ok + 1 + wrt.http = http + wrt.writer = _M.make_connected_http_writer(http, writer_opts) + else + wrt.err = { + err_code = err_code or 'CoroutineError', + err_msg = err_msg or 'coroutine error, when connect', + } + end + table.insert(writers, wrt) + end + + if n_ok >= quorum then + return writers + end + + for _, wrt in ipairs(writers) do + if wrt.http ~= nil then + wrt.http:close() + end + wrt.http = nil + end + + return nil, 'NotEnoughConnect', to_str('quorum:', quorum, ", actual:", n_ok) +end + +function _M.make_put_s3_writer(access_key, secret_key, endpoint, params, opts) + local s3_cli, err_code, err_msg = + s3_client.new(access_key, secret_key, endpoint, opts) + if err_code ~= nil then + return nil, err_code, err_msg + end + + local request, err_code, err_msg = + s3_cli:get_signed_request(params, 'put_object', opts) + if err_code ~= nil then + return nil, err_code, err_msg + end + + return function(pobj, ident) + local chunk_writer + if opts.aws_chunk == true then + chunk_writer = + aws_chunk_writer:new(request.signer, request.auth_ctx) + end + + local _, err_code, err_msg = s3_cli:send_request( + request.verb, request.uri, request.headers,request.body) + if err_code ~= nil then + return nil, err_code, err_msg + end + + while true do + local data, err_code, err_msg = pobj:read_pipe(ident) + if err_code ~= nil then + return nil, err_code, err_msg + end + + local send_data = data + if opts.aws_chunk == true then + send_data = chunk_writer:make_chunk(send_data) + end + + local _, err_code, err_msg = s3_cli:send_body(send_data) + if err_code ~= nil then + return nil, err_code, err_msg + end + + if data == '' then + break + end + end + + return s3_cli:finish_request() + end +end + +function _M.make_aws_put_s3_writer(access_key, secret_key, endpoint, params, opts) + opts = tableutil.dup(opts or {}, true) + opts.aws_chunk = true + + return _M.make_put_s3_writer(access_key, secret_key, endpoint, params, opts) +end + return _M diff --git a/t/test.lua b/t/test.lua index 20c32bc..839506f 100644 --- a/t/test.lua +++ b/t/test.lua @@ -153,7 +153,7 @@ function _M.test_pipe_empty_reader() local check_filter = make_check_err_filter('r', 1, 'ReadTimeout', 'TestSuccess') local cpipe, err_code, err_msg = pipe_pipe:new({empty_reader}, - {memery_writer}, {rd_filters = {check_filter}}, 2) + {memery_writer}, {rd_filters = {check_filter}}, 2000) if err_code ~= nil then return nil, err_code, err_msg end @@ -331,13 +331,13 @@ end function _M.test_pipe_read_timeout() local read_datas = {'xxx', 'yyy', 'zzz'} - local rd_timeout = 2 - local wrt_timeout = 3 + local rd_timeout = 2000 + local wrt_timeout = 3000 local timeout_reader = function(pobj, ident) for i, buf in ipairs(read_datas) do if i > 1 then - ngx.sleep(rd_timeout) + ngx.sleep(rd_timeout/1000) end local _, err_code, err_msg = pobj:write_pipe(ident, buf) @@ -375,15 +375,15 @@ end function _M.test_pipe_write_timeout() local read_datas = {'xxx', 'yyy', 'zzz'} - local rd_timeout = 3 - local wrt_timeout = 2 + local rd_timeout = 3000 + local wrt_timeout = 2000 local timeout_writer = function(pobj, ident) local n_write = 0 while true do n_write = n_write + 1 if n_write > 1 then - ngx.sleep(wrt_timeout + 2) + ngx.sleep(wrt_timeout/1000 + 2) end local data, err_code, err_msg = pobj:read_pipe(ident) if err_code ~= nil then @@ -426,6 +426,57 @@ function _M.test_pipe_write_timeout() end end +function _M.test_pipe_async_wait() + local read_datas = 'xxx' + + local rd_timeout = 3000 + local wrt_timeout = 2000 + + local writer = function(pobj, ident) + while true do + local data, err_code, err_msg = pobj:read_pipe(ident) + if err_code ~= nil then + return nil, err_code, err_msg + end + + if data == '' then + break + end + end + + ngx.sleep(0.3) + + if ident == 1 then + ngx.sleep(wrt_timeout/1000 + 2) + return nil, 'WriterError', 'writer 1 error' + elseif ident == 2 then + return nil, 'WriterError', 'writer 2 error' + end + end + + + local cpipe, err_code, err_msg = pipe_pipe:new( + {pipe_pipe.reader.make_memery_reader(read_datas)}, + {writer, writer, writer}, + {}, + rd_timeout, wrt_timeout) + + if err_code ~= nil then + return nil, err_code, err_msg + end + + local t0 = ngx.now() + + cpipe:pipe(is_running, 3) + + ngx.update_time() + local itv = ngx.now() - t0 + + if itv > 1 then + return nil, 'TestAsyncWait', 'test async error' + end +end + function _M.test() local test_prefix = 'test_pipe_' diff --git a/t/test_ngx_resp_writer.t b/t/test_ngx_resp_writer.t index 93ef2c0..451d4d9 100644 --- a/t/test_ngx_resp_writer.t +++ b/t/test_ngx_resp_writer.t @@ -29,8 +29,9 @@ __DATA__ local status = 200 local headers = {["Content-Length"]=3} local opts = {range ={}} - opts.range["start"] = 2 - opts.range["end"] = 4 + opts.range["from"] = 2 + opts.range["to"] = 4 + opts.body_sha1 = "35139ef894b28b73bea022755166a23933c7d9cb" local writers = {pipe_pipe.writer.make_ngx_resp_writer(status, headers, opts)}