diff --git a/LICENSE b/LICENSE index 261eeb9e9f8b..2351cfd2f30b 100644 --- a/LICENSE +++ b/LICENSE @@ -199,3 +199,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +======================================================================= +Apache ApiSix Subcomponents: + +The Apache ApiSix project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + + ewma.lua file from kubernetes/ingress-nginx: https://github.com/kubernetes/ingress-nginx Apache 2.0 + diff --git a/apisix/balancer.lua b/apisix/balancer.lua index 36f4f32d4b28..cfe53b955709 100644 --- a/apisix/balancer.lua +++ b/apisix/balancer.lua @@ -30,9 +30,9 @@ local module_name = "balancer" local pickers = { roundrobin = require("apisix.balancer.roundrobin"), chash = require("apisix.balancer.chash"), + ewma = require("apisix.balancer.ewma") } - local lrucache_server_picker = core.lrucache.new({ ttl = 300, count = 256 }) @@ -245,7 +245,7 @@ local function pick_server(route, ctx) core.log.error("failed to parse server addr: ", server, " err: ", err) return core.response.exit(502) end - + ctx.server_picker = server_picker return res end diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua new file mode 100644 index 000000000000..ba429d924951 --- /dev/null +++ b/apisix/balancer/ewma.lua @@ -0,0 +1,191 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + + +local core = require("apisix.core") +local ngx = ngx +local ngx_shared = ngx.shared +local ngx_now = ngx.now +local math = math +local pairs = pairs +local next = next +local tonumber = tonumber + +local _M = {} +local DECAY_TIME = 10 -- this value is in seconds + +local shm_ewma = ngx_shared.balancer_ewma +local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at + +local lrucache_addr = core.lrucache.new({ + ttl = 300, count = 1024 +}) +local lrucache_trans_format = core.lrucache.new({ + ttl = 300, count = 256 +}) + + +local function decay_ewma(ewma, last_touched_at, rtt, now) + local td = now - last_touched_at + td = math.max(td, 0) + local weight = math.exp(-td / DECAY_TIME) + + ewma = ewma * weight + rtt * (1.0 - weight) + return ewma +end + + +local function store_stats(upstream, ewma, now) + local success, err, forcible = shm_last_touched_at:set(upstream, now) + if not success then + core.log.error("balancer_ewma_last_touched_at:set failed ", err) + end + if forcible then + core.log.warn("balancer_ewma_last_touched_at:set valid items forcibly overwritten") + end + + success, err, forcible = shm_ewma:set(upstream, ewma) + if not success then + core.log.error("balancer_ewma:set failed ", err) + end + if forcible then + core.log.warn("balancer_ewma:set valid items forcibly overwritten") + end +end + + +local function get_or_update_ewma(upstream, rtt, update) + local ewma = shm_ewma:get(upstream) or 0 + local now = ngx_now() + local last_touched_at = shm_last_touched_at:get(upstream) or 0 + ewma = decay_ewma(ewma, last_touched_at, rtt, now) + + if not update then + return ewma + end + + store_stats(upstream, ewma, now) + + return ewma +end + + +local function score(upstream) + -- Original implementation used names + -- Endpoints don't have names, so passing in host:Port as key instead + local upstream_name = upstream.host .. ":" .. upstream.port + return get_or_update_ewma(upstream_name, 0, false) +end + + +local function pick_and_score(peers) + local lowest_score_index = 1 + local lowest_score = score(peers[lowest_score_index]) + for i = 2, #peers do + local new_score = score(peers[i]) + if new_score < lowest_score then + lowest_score_index, lowest_score = i, new_score + end + end + + return peers[lowest_score_index], lowest_score +end + + +local function parse_addr(addr) + local host, port, err = core.utils.parse_addr(addr) + return {host = host, port = port}, err +end + + +local function _trans_format(up_nodes) + -- trans + --{"1.2.3.4:80":100,"5.6.7.8:8080":100} + -- into + -- [{"host":"1.2.3.4","port":"80"},{"host":"5.6.7.8","port":"8080"}] + local peers = {} + local res, err + + for addr, _ in pairs(up_nodes) do + res, err = lrucache_addr(addr, nil, parse_addr, addr) + if not err then + core.table.insert(peers, res) + else + core.log.error('parse_addr error: ', addr, err) + end + end + + return next(peers) and peers or nil +end + + +local function _ewma_find(ctx, up_nodes) + local peers + local endpoint + + if not up_nodes + or core.table.nkeys(up_nodes) == 0 then + return nil, 'up_nodes empty' + end + + peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version, + _trans_format, up_nodes) + if not peers then + return nil, 'up_nodes trans error' + end + + if #peers > 1 then + endpoint = pick_and_score(peers) + else + endpoint = peers[1] + end + + return endpoint.host .. ":" .. endpoint.port +end + + +local function _ewma_after_balance(ctx) + local response_time = tonumber(ctx.var.upstream_response_time) or 0 + local connect_time = tonumber(ctx.var.upstream_connect_time) or 0 + local rtt = connect_time + response_time + local upstream = ctx.var.upstream_addr + + if not upstream then + return nil, "no upstream addr found" + end + + return get_or_update_ewma(upstream, rtt, true) +end + + +function _M.new(up_nodes, upstream) + if not shm_ewma + or not shm_last_touched_at then + return nil, "dictionary not find" + end + + return { + upstream = upstream, + get = function (ctx) + return _ewma_find(ctx, up_nodes) + end, + after_balance = _ewma_after_balance + } +end + + +return _M diff --git a/apisix/init.lua b/apisix/init.lua index bbb22a709dd5..0bc98420e802 100644 --- a/apisix/init.lua +++ b/apisix/init.lua @@ -638,6 +638,10 @@ function _M.http_log_phase() local api_ctx = common_phase("log") healcheck_passive(api_ctx) + if api_ctx.server_picker and api_ctx.server_picker.after_balance then + api_ctx.server_picker.after_balance(api_ctx) + end + if api_ctx.uri_parse_param then core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param) end diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua index 9e4604f29152..9294f042bdb3 100644 --- a/apisix/schema_def.lua +++ b/apisix/schema_def.lua @@ -311,7 +311,7 @@ local upstream_schema = { type = { description = "algorithms of load balancing", type = "string", - enum = {"chash", "roundrobin"} + enum = {"chash", "roundrobin", "ewma"} }, checks = health_checker, hash_on = { diff --git a/bin/apisix b/bin/apisix index 609d47047276..a9df90150c0c 100755 --- a/bin/apisix +++ b/bin/apisix @@ -181,6 +181,9 @@ http { lua_shared_dict worker-events 10m; lua_shared_dict lrucache-lock 10m; lua_shared_dict skywalking-tracing-buffer 100m; + lua_shared_dict balancer_ewma 10m; + lua_shared_dict balancer_ewma_locks 10m; + lua_shared_dict balancer_ewma_last_touched_at 10m; # for openid-connect plugin lua_shared_dict discovery 1m; # cache for discovery metadata documents diff --git a/doc/admin-api.md b/doc/admin-api.md index 9a3def916ded..3b8ac892e6c5 100644 --- a/doc/admin-api.md +++ b/doc/admin-api.md @@ -493,7 +493,7 @@ In addition to the basic complex equalization algorithm selection, APISIX's Upst |Name |Optional|Description| |------- |-----|------| -|type |required|`roundrobin` supports the weight of the load, `chash` consistency hash, pick one of them.| +|type |required|`roundrobin` supports the weight of the load, `chash` consistency hash,`ewma` minimum latency ,pick one of them.see https://en.wikipedia.org/wiki/EWMA_chart for details| |nodes |required if `k8s_deployment_info` not configured|Hash table, the key of the internal element is the upstream machine address list, the format is `Address + Port`, where the address part can be IP or domain name, such as `192.168.1.100:80`, `foo.com:80`, etc. Value is the weight of the node. In particular, when the weight value is `0`, it has a special meaning, which usually means that the upstream node is invalid and never wants to be selected.| |k8s_deployment_info|required if `nodes` not configured|fields: `namespace`、`deploy_name`、`service_name`、`port`、`backend_type`, `port` is number, `backend_type` is `pod` or `service`, others is string. | |hash_on |optional|This option is only valid if the `type` is `chash`. Supported types `vars`(Nginx variables), `header`(custom header), `cookie`, `consumer`, the default value is `vars`.| diff --git a/t/APISIX.pm b/t/APISIX.pm index d0e7e3bface9..7f73650fc75b 100644 --- a/t/APISIX.pm +++ b/t/APISIX.pm @@ -209,6 +209,9 @@ _EOC_ lua_shared_dict worker-events 10m; lua_shared_dict lrucache-lock 10m; lua_shared_dict skywalking-tracing-buffer 100m; + lua_shared_dict balancer_ewma 1m; + lua_shared_dict balancer_ewma_locks 1m; + lua_shared_dict balancer_ewma_last_touched_at 1m; resolver $dns_addrs_str; resolver_timeout 5; diff --git a/t/lib/server.lua b/t/lib/server.lua index 864a8e2778d5..b9c8db41cdcd 100644 --- a/t/lib/server.lua +++ b/t/lib/server.lua @@ -68,6 +68,16 @@ function _M.sleep1() ngx.say("ok") end +function _M.ewma() + if ngx.var.server_port == "1981" + or ngx.var.server_port == "1982" then + ngx.sleep(0.2) + else + ngx.sleep(0.1) + end + ngx.print(ngx.var.server_port) +end + function _M.uri() -- ngx.sleep(1) ngx.say("uri: ", ngx.var.uri) diff --git a/t/node/ewma.t b/t/node/ewma.t new file mode 100644 index 000000000000..666ffec8f66d --- /dev/null +++ b/t/node/ewma.t @@ -0,0 +1,217 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +#no_long_string(); +no_root_location(); +log_level('info'); +run_tests; + +__DATA__ + +=== TEST 1: add upstream +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "upstream": { + "nodes": { + "127.0.0.1:1980": 100, + "127.0.0.1:1981": 100 + }, + "type": "ewma" + }, + "uri": "/ewma" + }]], + [[{ + "node": { + "value": { + "upstream": { + "nodes": { + "127.0.0.1:1980": 100, + "127.0.0.1:1981": 100 + }, + "type": "ewma" + }, + "uri": "/ewma" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 2: about latency +--- timeout: 5 +--- config + location /t { + content_by_lua_block { + --node: "127.0.0.1:1980": latency is 0.001 + --node: "127.0.0.1:1981": latency is 0.005 + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port + .. "/ewma" + + local ports_count = {} + for i = 1, 12 do + local httpc = http.new() + httpc:set_timeout(1000) + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + if not res then + ngx.say(err) + return + end + + ports_count[res.body] = (ports_count[res.body] or 0) + 1 + end + + local ports_arr = {} + for port, count in pairs(ports_count) do + table.insert(ports_arr, {port = port, count = count}) + end + + local function cmd(a, b) + return a.port > b.port + end + table.sort(ports_arr, cmd) + + ngx.say(require("cjson").encode(ports_arr)) + ngx.exit(200) + } + } +--- request +GET /t +--- response_body +[{"count":1,"port":"1981"},{"count":11,"port":"1980"}] +--- error_code: 200 +--- no_error_log +[error] + + +=== TEST 3: about frequency +--- timeout: 30 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port + .. "/ewma" + + --node: "127.0.0.1:1980": latency is 0.001 + --node: "127.0.0.1:1981": latency is 0.005 + local ports_count = {} + for i = 1, 2 do + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + if not res then + ngx.say(err) + return + end + end + + --remove the 1981 node, + --add the 1982 node + --keep two nodes for triggering ewma logic in server_picker function of balancer phase + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "upstream": { + "nodes": { + "127.0.0.1:1980": 100, + "127.0.0.1:1982": 100 + }, + "type": "ewma" + }, + "uri": "/ewma" + }]] + ) + + if code ~= 200 then + ngx.say("update route failed") + return + end + + ngx.sleep(20) + --keep the node 1980 hot + for i = 1, 12 do + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + if not res then + ngx.say(err) + return + end + end + + --recover the 1981 node + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "upstream": { + "nodes": { + "127.0.0.1:1980": 100, + "127.0.0.1:1981": 100 + }, + "type": "ewma" + }, + "uri": "/ewma" + }]] + ) + + if code ~= 200 then + ngx.say("update route failed") + return + end + + --should select the 1981 node,because it is idle + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + if not res then + ngx.say(err) + return + end + ngx.say(require("cjson").encode({port = res.body, count = 1})) + ngx.exit(200) + } + } +--- request +GET /t +--- response_body +{"count":1,"port":"1981"} +--- error_code: 200 +--- no_error_log +[error] +