apisix
apisix copied to clipboard
feat: As a user, I want nacos discovery support health check for nacos hosts, so that we can pick a healthy host for operation
Description
refer: https://github.com/apache/apisix/issues/8753#issuecomment-1411349321
Do you mean health check on DP side?
Do you mean health check on DP side?
Yes, I found todo in init.lua https://github.com/apache/apisix/blob/403e4c51d605f7ea88f2481bc505ba9c9942c347/apisix/discovery/nacos/init.lua#L150
but if the field hosts is domain name instead of ip, it seems health check is unnecessary.
I see
你可以用这个脚本修复:
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local require = require
local local_conf = require('apisix.core.config_local').local_conf()
local http = require('resty.http')
local core = require('apisix.core')
local ipairs = ipairs
local type = type
local math = math
local math_random = math.random
local ngx = ngx
local ngx_re = require('ngx.re')
local ngx_timer_at = ngx.timer.at
local ngx_timer_every = ngx.timer.every
local string = string
local string_sub = string.sub
local str_byte = string.byte
local str_find = core.string.find
local log = core.log
local default_weight
local applications
local auth_path = 'auth/login'
local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
local default_namespace_id = "public"
local default_group_name = "DEFAULT_GROUP"
local access_key
local secret_key
local events
local events_list
-- 健康状态管理和重试配置(写死)
local host_health_status = {}
local health_check_interval = 5000 -- 5秒
local health_check_timeout = 2000 -- 2秒
local max_failures = 3 -- 最大失败次数
local recovery_time = 30000 -- 30秒恢复时间
-- 缓存数据相关
local cached_applications = nil -- 缓存的应用数据
local cache_valid = false -- 缓存是否有效
local last_successful_update = 0 -- 最后一次成功更新时间戳
local max_cache_age = 300 -- 缓存最大有效期(5分钟)
local _M = {}
local function discovery_nacos_callback(data, event, source, pid)
applications = data
-- 更新成功时也更新缓存
if data then
cached_applications = core.json.decode(core.json.encode(data)) -- 深拷贝
cache_valid = true
last_successful_update = ngx.now()
log.notice("Updated cache data on successful update")
end
log.notice("update local variable application, event is: ", event,
"source: ", source, "server pid:", pid,
", application: ", core.json.encode(applications, true))
end
-- 健康检查函数
local function check_host_health(host_url)
local status = host_health_status[host_url]
-- 如果节点处于恢复期,检查是否过了恢复时间
if status and status.unhealthy_time then
local now = ngx.now() * 1000 -- 转换为毫秒
if now - status.unhealthy_time > recovery_time then
-- 恢复期结束,重置状态
host_health_status[host_url] = {
healthy = true,
failure_count = 0,
last_check_time = now
}
log.info("Host ", host_url, " recovery period ended, reset to healthy")
end
end
-- 如果节点标记为不健康,直接返回false
if status and not status.healthy then
return false
end
return true
end
-- 标记节点为健康
local function mark_host_healthy(host_url)
host_health_status[host_url] = {
healthy = true,
failure_count = 0,
last_check_time = ngx.now() * 1000
}
log.info("Mark host as healthy: ", host_url)
end
-- 标记节点为不健康
local function mark_host_unhealthy(host_url, reason)
local now = ngx.now() * 1000
local status = host_health_status[host_url] or { failure_count = 0 }
status.failure_count = status.failure_count + 1
status.last_check_time = now
status.last_error = reason
if status.failure_count >= max_failures then
status.healthy = false
status.unhealthy_time = now
log.error("Mark host as unhealthy: ", host_url, ", failures: ", status.failure_count, ", reason: ", reason)
else
log.warn("Host failure count: ", host_url, " - ", status.failure_count, "/", max_failures, ", reason: ", reason)
end
host_health_status[host_url] = status
end
-- 获取健康节点列表
local function get_healthy_hosts()
local hosts = local_conf.discovery.nacos.host
local healthy_hosts = {}
for _, host in ipairs(hosts) do
if check_host_health(host) then
core.table.insert(healthy_hosts, host)
end
end
return healthy_hosts
end
-- 健康检查任务
local function health_check_task(premature)
if premature then
return
end
local hosts = local_conf.discovery.nacos.host
for _, host_url in ipairs(hosts) do
local ok, err = ngx.timer.at(0, function()
local httpc = http.new()
httpc:set_timeout(health_check_timeout)
-- 构建健康检查URL
local health_url = host_url
if local_conf.discovery.nacos.prefix then
health_url = health_url .. local_conf.discovery.nacos.prefix
end
health_url = health_url .. "cs/history?search=accurate&dataId=1&group=1"
local res, err = httpc:request_uri(health_url, {
method = "GET",
ssl_verify = false
})
if res and res.status == 200 then
mark_host_healthy(host_url)
else
mark_host_unhealthy(host_url, err or "Health check failed with status: " .. (res and res.status or "unknown"))
end
end)
if not ok then
log.error("Failed to create health check timer for host: ", host_url, ", error: ", err)
end
end
end
local function request(request_uri, path, body, method, basic_auth)
local url = request_uri .. path
log.info('request url:', url)
local headers = {}
headers['Accept'] = 'application/json'
if basic_auth then
headers['Authorization'] = basic_auth
end
if body and 'table' == type(body) then
local err
body, err = core.json.encode(body)
if not body then
return nil, 'invalid body : ' .. err
end
headers['Content-Type'] = 'application/json'
end
local httpc = http.new()
local timeout = local_conf.discovery.nacos.timeout
local connect_timeout = timeout.connect
local send_timeout = timeout.send
local read_timeout = timeout.read
log.info('connect_timeout:', connect_timeout, ', send_timeout:', send_timeout,
', read_timeout:', read_timeout)
httpc:set_timeouts(connect_timeout, send_timeout, read_timeout)
local res, err = httpc:request_uri(url, {
method = method,
headers = headers,
body = body,
ssl_verify = true,
})
if not res then
return nil, err
end
if not res.body or res.status ~= 200 then
return nil, 'status = ' .. res.status
end
local json_str = res.body
local data, err = core.json.decode(json_str)
if not data then
return nil, err
end
return data
end
local function get_url(request_uri, path)
return request(request_uri, path, nil, 'GET', nil)
end
local function post_url(request_uri, path, body)
return request(request_uri, path, body, 'POST', nil)
end
-- 处理主机URL,提取认证信息(保持原逻辑)
local function get_base_uri()
local host = local_conf.discovery.nacos.host
-- 使用健康节点
local healthy_hosts = get_healthy_hosts()
if #healthy_hosts == 0 then
-- 如果没有健康节点,使用所有节点
healthy_hosts = host
end
local selected_host = healthy_hosts[math_random(#healthy_hosts)]
local url = selected_host
local username, password
local auth_idx = core.string.rfind_char(url, '@')
if auth_idx then
local protocol_idx = str_find(url, '://')
local protocol = string_sub(url, 1, protocol_idx + 2)
local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1)
local arr = ngx_re.split(user_and_password, ':')
if #arr == 2 then
username = arr[1]
password = arr[2]
end
local other = string_sub(url, auth_idx + 1)
url = protocol .. other
end
if local_conf.discovery.nacos.prefix then
url = url .. local_conf.discovery.nacos.prefix
end
if str_byte(url, #url) ~= str_byte('/') then
url = url .. '/'
end
return url, username, password
end
local function get_token_param(base_uri, username, password)
if not username or not password then
return ''
end
local args = { username = username, password = password}
local data, err = post_url(base_uri, auth_path .. '?' .. ngx.encode_args(args), nil)
if err then
log.error('nacos login fail:', username, ' ', password, ' desc:', err)
return nil, err
end
return '&accessToken=' .. data.accessToken
end
local function get_namespace_param(namespace_id)
local param = ''
if namespace_id then
local args = {namespaceId = namespace_id}
param = '&' .. ngx.encode_args(args)
end
return param
end
local function get_group_name_param(group_name)
local param = ''
if group_name then
local args = {groupName = group_name}
param = '&' .. ngx.encode_args(args)
end
return param
end
local function de_duplication(services, namespace_id, group_name, service_name, scheme)
for _, service in ipairs(services) do
if service.namespace_id == namespace_id and service.group_name == group_name
and service.service_name == service_name and service.scheme == scheme then
return true
end
end
return false
end
local function iter_and_add_service(services, values)
if not values then
return
end
for _, value in core.config_util.iterate_values(values) do
local conf = value.value
if not conf then
goto CONTINUE
end
local up
if conf.upstream then
up = conf.upstream
else
up = conf
end
local namespace_id = (up.discovery_args and up.discovery_args.namespace_id)
or default_namespace_id
local group_name = (up.discovery_args and up.discovery_args.group_name)
or default_group_name
local dup = de_duplication(services, namespace_id, group_name,
up.service_name, up.scheme)
if dup then
goto CONTINUE
end
if up.discovery_type == 'nacos' then
core.table.insert(services, {
service_name = up.service_name,
namespace_id = namespace_id,
group_name = group_name,
scheme = up.scheme,
})
end
::CONTINUE::
end
end
local function get_nacos_services()
local services = {}
-- here we use lazy load to work around circle dependency
local get_upstreams = require('apisix.upstream').upstreams
local get_routes = require('apisix.router').http_routes
local get_stream_routes = require('apisix.router').stream_routes
local get_services = require('apisix.http.service').services
local values = get_upstreams()
iter_and_add_service(services, values)
values = get_routes()
iter_and_add_service(services, values)
values = get_services()
iter_and_add_service(services, values)
values = get_stream_routes()
iter_and_add_service(services, values)
return services
end
local function is_grpc(scheme)
if scheme == 'grpc' or scheme == 'grpcs' then
return true
end
return false
end
-- 检查缓存是否过期
local function is_cache_expired()
if not cache_valid or not last_successful_update then
return true
end
local now = ngx.now()
return (now - last_successful_update) > max_cache_age
end
-- 使用缓存数据
local function use_cached_data()
if cached_applications and not is_cache_expired() then
applications = core.json.decode(core.json.encode(cached_applications)) -- 深拷贝
log.warn("Using cached service discovery data due to all nacos nodes failure")
return true
end
return false
end
-- 更新应用数据
local function update_applications(new_apps)
local new_apps_md5sum = ngx.md5(core.json.encode(new_apps))
local old_apps_md5sum = ngx.md5(core.json.encode(applications or {}))
if new_apps_md5sum == old_apps_md5sum then
return
end
applications = new_apps
-- 更新缓存
cached_applications = core.json.decode(core.json.encode(new_apps)) -- 深拷贝
cache_valid = true
last_successful_update = ngx.now()
local ok, err = events.post(events_list._source, events_list.updating,
applications)
if not ok then
log.error("post_event failure with ", events_list._source,
", update application error: ", err)
else
log.info("Successfully updated applications data and cache")
end
end
local function fetch_full_registry(premature)
if premature then
return
end
local up_apps = {}
local base_uri, username, password = get_base_uri()
local token_param, err = get_token_param(base_uri, username, password)
if err then
log.error('get_token_param error:', err)
-- 所有节点都失败时使用缓存
if not applications then
if use_cached_data() then
log.warn("Using cached data for initial setup")
else
applications = up_apps
end
end
return
end
local infos = get_nacos_services()
if #infos == 0 then
applications = up_apps
return
end
local success_count = 0
local total_services = #infos
for _, service_info in ipairs(infos) do
local data, err
local namespace_id = service_info.namespace_id
local group_name = service_info.group_name
local scheme = service_info.scheme or ''
local namespace_param = get_namespace_param(service_info.namespace_id)
local group_name_param = get_group_name_param(service_info.group_name)
local query_path = instance_list_path .. service_info.service_name
.. token_param .. namespace_param .. group_name_param
data, err = get_url(base_uri, query_path)
if err then
log.error('get_url:', query_path, ' err:', err)
goto CONTINUE
end
if not up_apps[namespace_id] then
up_apps[namespace_id] = {}
end
if not up_apps[namespace_id][group_name] then
up_apps[namespace_id][group_name] = {}
end
for _, host in ipairs(data.hosts) do
local nodes = up_apps[namespace_id]
[group_name][service_info.service_name]
if not nodes then
nodes = {}
up_apps[namespace_id]
[group_name][service_info.service_name] = nodes
end
local node = {
host = host.ip,
port = host.port,
weight = host.weight or default_weight,
}
-- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then
node.port = host.metadata.gRPC_port
end
core.table.insert(nodes, node)
end
success_count = success_count + 1
end
log.info("Service discovery completed: ", success_count, "/", total_services, " services updated")
if success_count > 0 then
-- 部分或全部服务更新成功
update_applications(up_apps)
else
-- 所有服务都失败了,尝试使用缓存数据
log.error("Failed to fetch any service information from all nacos nodes")
if use_cached_data() then
log.warn("Successfully fallback to cached service discovery data")
else
log.error("No valid cache available, service discovery data may be stale or empty")
end
end
::CONTINUE::
end
function _M.nodes(service_name, discovery_args)
local namespace_id = discovery_args and
discovery_args.namespace_id or default_namespace_id
local group_name = discovery_args
and discovery_args.group_name or default_group_name
local logged = false
-- maximum waiting time: 5 seconds
local waiting_time = 5
local step = 0.1
while not applications and waiting_time > 0 do
if not logged then
log.warn('wait init')
logged = true
end
ngx.sleep(step)
waiting_time = waiting_time - step
end
if not applications or not applications[namespace_id]
or not applications[namespace_id][group_name]
then
return nil
end
return applications[namespace_id][group_name][service_name]
end
function _M.init_worker()
events = require("resty.worker.events")
events_list = events.event_list("discovery_nacos_update_application",
"updating")
if 0 ~= ngx.worker.id() then
events.register(discovery_nacos_callback, events_list._source,
events_list.updating)
return
end
default_weight = local_conf.discovery.nacos.weight
log.info('default_weight:', default_weight)
local fetch_interval = local_conf.discovery.nacos.fetch_interval
log.info('fetch_interval:', fetch_interval)
access_key = local_conf.discovery.nacos.access_key
secret_key = local_conf.discovery.nacos.secret_key
-- 初始化健康状态
local hosts = local_conf.discovery.nacos.host
for _, host in ipairs(hosts) do
host_health_status[host] = {
healthy = true,
failure_count = 0,
last_check_time = ngx.now() * 1000
}
end
-- 启动定时任务
ngx_timer_at(0, fetch_full_registry)
ngx_timer_every(fetch_interval, fetch_full_registry)
-- 启动健康检查
ngx_timer_every(health_check_interval / 1000, health_check_task)
log.info("Nacos discovery initialized with health check and cache support")
end
function _M.dump_data()
return {
config = local_conf.discovery.nacos,
services = applications or {},
cache = {
cached_applications = cached_applications and true or false,
cache_valid = cache_valid,
last_successful_update = last_successful_update,
is_cache_expired = is_cache_expired()
}
}
end
-- 获取健康状态信息(用于调试)
function _M.get_health_status()
return host_health_status
end
-- 手动清除缓存(用于测试和调试)
function _M.clear_cache()
cached_applications = nil
cache_valid = false
last_successful_update = 0
log.info("Cache cleared manually")
end
-- 获取缓存状态
function _M.get_cache_status()
return {
cache_valid = cache_valid,
last_successful_update = last_successful_update,
is_cache_expired = is_cache_expired(),
cached_applications = cached_applications and true or false
}
end
return _M