apisix icon indicating copy to clipboard operation
apisix copied to clipboard

feat: As a user, I want nacos discovery support health check for nacos hosts, so that we can pick a healthy host for operation

Open ronething opened this issue 3 years ago • 4 comments

Description

refer: https://github.com/apache/apisix/issues/8753#issuecomment-1411349321

ronething avatar Feb 03 '23 01:02 ronething

Do you mean health check on DP side?

spacewander avatar Feb 05 '23 12:02 spacewander

Do you mean health check on DP side?

Yes, I found todo in init.lua https://github.com/apache/apisix/blob/403e4c51d605f7ea88f2481bc505ba9c9942c347/apisix/discovery/nacos/init.lua#L150

but if the field hosts is domain name instead of ip, it seems health check is unnecessary.

ronething avatar Feb 06 '23 01:02 ronething

I see

spacewander avatar Feb 06 '23 02:02 spacewander

你可以用这个脚本修复:

-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements.  See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License.  You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local require            = require
local local_conf         = require('apisix.core.config_local').local_conf()
local http               = require('resty.http')
local core               = require('apisix.core')
local ipairs             = ipairs
local type               = type
local math               = math
local math_random        = math.random
local ngx                = ngx
local ngx_re             = require('ngx.re')
local ngx_timer_at       = ngx.timer.at
local ngx_timer_every    = ngx.timer.every
local string             = string
local string_sub         = string.sub
local str_byte           = string.byte
local str_find           = core.string.find
local log                = core.log

local default_weight
local applications
local auth_path = 'auth/login'
local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
local default_namespace_id = "public"
local default_group_name = "DEFAULT_GROUP"
local access_key
local secret_key

local events
local events_list

-- 健康状态管理和重试配置(写死)
local host_health_status = {}
local health_check_interval = 5000 -- 5秒
local health_check_timeout = 2000 -- 2秒
local max_failures = 3 -- 最大失败次数
local recovery_time = 30000 -- 30秒恢复时间

-- 缓存数据相关
local cached_applications = nil -- 缓存的应用数据
local cache_valid = false -- 缓存是否有效
local last_successful_update = 0 -- 最后一次成功更新时间戳
local max_cache_age = 300 -- 缓存最大有效期(5分钟)

local _M = {}

local function discovery_nacos_callback(data, event, source, pid)
    applications = data
    -- 更新成功时也更新缓存
    if data then
        cached_applications = core.json.decode(core.json.encode(data)) -- 深拷贝
        cache_valid = true
        last_successful_update = ngx.now()
        log.notice("Updated cache data on successful update")
    end
    log.notice("update local variable application, event is: ", event,
               "source: ", source, "server pid:", pid,
               ", application: ", core.json.encode(applications, true))
end

-- 健康检查函数
local function check_host_health(host_url)
    local status = host_health_status[host_url]
    
    -- 如果节点处于恢复期,检查是否过了恢复时间
    if status and status.unhealthy_time then
        local now = ngx.now() * 1000 -- 转换为毫秒
        if now - status.unhealthy_time > recovery_time then
            -- 恢复期结束,重置状态
            host_health_status[host_url] = {
                healthy = true,
                failure_count = 0,
                last_check_time = now
            }
            log.info("Host ", host_url, " recovery period ended, reset to healthy")
        end
    end
    
    -- 如果节点标记为不健康,直接返回false
    if status and not status.healthy then
        return false
    end
    
    return true
end

-- 标记节点为健康
local function mark_host_healthy(host_url)
    host_health_status[host_url] = {
        healthy = true,
        failure_count = 0,
        last_check_time = ngx.now() * 1000
    }
    log.info("Mark host as healthy: ", host_url)
end

-- 标记节点为不健康
local function mark_host_unhealthy(host_url, reason)
    local now = ngx.now() * 1000
    local status = host_health_status[host_url] or { failure_count = 0 }
    
    status.failure_count = status.failure_count + 1
    status.last_check_time = now
    status.last_error = reason
    
    if status.failure_count >= max_failures then
        status.healthy = false
        status.unhealthy_time = now
        log.error("Mark host as unhealthy: ", host_url, ", failures: ", status.failure_count, ", reason: ", reason)
    else
        log.warn("Host failure count: ", host_url, " - ", status.failure_count, "/", max_failures, ", reason: ", reason)
    end
    
    host_health_status[host_url] = status
end

-- 获取健康节点列表
local function get_healthy_hosts()
    local hosts = local_conf.discovery.nacos.host
    local healthy_hosts = {}
    
    for _, host in ipairs(hosts) do
        if check_host_health(host) then
            core.table.insert(healthy_hosts, host)
        end
    end
    
    return healthy_hosts
end

-- 健康检查任务
local function health_check_task(premature)
    if premature then
        return
    end
    
    local hosts = local_conf.discovery.nacos.host
    for _, host_url in ipairs(hosts) do
        local ok, err = ngx.timer.at(0, function()
            local httpc = http.new()
            httpc:set_timeout(health_check_timeout)
            
            -- 构建健康检查URL
            local health_url = host_url
            if local_conf.discovery.nacos.prefix then
                health_url = health_url .. local_conf.discovery.nacos.prefix
            end
            health_url = health_url .. "cs/history?search=accurate&dataId=1&group=1"
            
            local res, err = httpc:request_uri(health_url, {
                method = "GET",
                ssl_verify = false
            })
            
            if res and res.status == 200 then
                mark_host_healthy(host_url)
            else
                mark_host_unhealthy(host_url, err or "Health check failed with status: " .. (res and res.status or "unknown"))
            end
        end)
        
        if not ok then
            log.error("Failed to create health check timer for host: ", host_url, ", error: ", err)
        end
    end
end

local function request(request_uri, path, body, method, basic_auth)
    local url = request_uri .. path
    log.info('request url:', url)
    local headers = {}
    headers['Accept'] = 'application/json'

    if basic_auth then
        headers['Authorization'] = basic_auth
    end

    if body and 'table' == type(body) then
        local err
        body, err = core.json.encode(body)
        if not body then
            return nil, 'invalid body : ' .. err
        end
        headers['Content-Type'] = 'application/json'
    end

    local httpc = http.new()
    local timeout = local_conf.discovery.nacos.timeout
    local connect_timeout = timeout.connect
    local send_timeout = timeout.send
    local read_timeout = timeout.read
    log.info('connect_timeout:', connect_timeout, ', send_timeout:', send_timeout,
             ', read_timeout:', read_timeout)
    httpc:set_timeouts(connect_timeout, send_timeout, read_timeout)
    local res, err = httpc:request_uri(url, {
        method = method,
        headers = headers,
        body = body,
        ssl_verify = true,
    })
    if not res then
        return nil, err
    end

    if not res.body or res.status ~= 200 then
        return nil, 'status = ' .. res.status
    end

    local json_str = res.body
    local data, err = core.json.decode(json_str)
    if not data then
        return nil, err
    end
    return data
end

local function get_url(request_uri, path)
    return request(request_uri, path, nil, 'GET', nil)
end

local function post_url(request_uri, path, body)
    return request(request_uri, path, body, 'POST', nil)
end

-- 处理主机URL,提取认证信息(保持原逻辑)
local function get_base_uri()
    local host = local_conf.discovery.nacos.host
    -- 使用健康节点
    local healthy_hosts = get_healthy_hosts()
    if #healthy_hosts == 0 then
        -- 如果没有健康节点,使用所有节点
        healthy_hosts = host
    end
    
    local selected_host = healthy_hosts[math_random(#healthy_hosts)]
    local url = selected_host
    local username, password
    
    local auth_idx = core.string.rfind_char(url, '@')
    if auth_idx then
        local protocol_idx = str_find(url, '://')
        local protocol = string_sub(url, 1, protocol_idx + 2)
        local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1)
        local arr = ngx_re.split(user_and_password, ':')
        if #arr == 2 then
            username = arr[1]
            password = arr[2]
        end
        local other = string_sub(url, auth_idx + 1)
        url = protocol .. other
    end

    if local_conf.discovery.nacos.prefix then
        url = url .. local_conf.discovery.nacos.prefix
    end

    if str_byte(url, #url) ~= str_byte('/') then
        url = url .. '/'
    end

    return url, username, password
end

local function get_token_param(base_uri, username, password)
    if not username or not password then
        return ''
    end

    local args = { username = username, password = password}
    local data, err = post_url(base_uri, auth_path .. '?' .. ngx.encode_args(args), nil)
    if err then
        log.error('nacos login fail:', username, ' ', password, ' desc:', err)
        return nil, err
    end
    return '&accessToken=' .. data.accessToken
end

local function get_namespace_param(namespace_id)
    local param = ''
    if namespace_id then
        local args = {namespaceId = namespace_id}
        param = '&' .. ngx.encode_args(args)
    end
    return param
end

local function get_group_name_param(group_name)
    local param = ''
    if group_name then
        local args = {groupName = group_name}
        param = '&' .. ngx.encode_args(args)
    end
    return param
end

local function de_duplication(services, namespace_id, group_name, service_name, scheme)
    for _, service in ipairs(services) do
        if service.namespace_id == namespace_id and service.group_name == group_name
                and service.service_name == service_name and service.scheme == scheme then
            return true
        end
    end
    return false
end

local function iter_and_add_service(services, values)
    if not values then
        return
    end

    for _, value in core.config_util.iterate_values(values) do
        local conf = value.value
        if not conf then
            goto CONTINUE
        end

        local up
        if conf.upstream then
            up = conf.upstream
        else
            up = conf
        end

        local namespace_id = (up.discovery_args and up.discovery_args.namespace_id)
                             or default_namespace_id

        local group_name = (up.discovery_args and up.discovery_args.group_name)
                           or default_group_name

        local dup = de_duplication(services, namespace_id, group_name,
                up.service_name, up.scheme)
        if dup then
            goto CONTINUE
        end

        if up.discovery_type == 'nacos' then
            core.table.insert(services, {
                service_name = up.service_name,
                namespace_id = namespace_id,
                group_name = group_name,
                scheme = up.scheme,
            })
        end
        ::CONTINUE::
    end
end

local function get_nacos_services()
    local services = {}

    -- here we use lazy load to work around circle dependency
    local get_upstreams = require('apisix.upstream').upstreams
    local get_routes = require('apisix.router').http_routes
    local get_stream_routes = require('apisix.router').stream_routes
    local get_services = require('apisix.http.service').services
    local values = get_upstreams()
    iter_and_add_service(services, values)
    values = get_routes()
    iter_and_add_service(services, values)
    values = get_services()
    iter_and_add_service(services, values)
    values = get_stream_routes()
    iter_and_add_service(services, values)
    return services
end

local function is_grpc(scheme)
    if scheme == 'grpc' or scheme == 'grpcs' then
        return true
    end

    return false
end

-- 检查缓存是否过期
local function is_cache_expired()
    if not cache_valid or not last_successful_update then
        return true
    end
    
    local now = ngx.now()
    return (now - last_successful_update) > max_cache_age
end

-- 使用缓存数据
local function use_cached_data()
    if cached_applications and not is_cache_expired() then
        applications = core.json.decode(core.json.encode(cached_applications)) -- 深拷贝
        log.warn("Using cached service discovery data due to all nacos nodes failure")
        return true
    end
    return false
end

-- 更新应用数据
local function update_applications(new_apps)
    local new_apps_md5sum = ngx.md5(core.json.encode(new_apps))
    local old_apps_md5sum = ngx.md5(core.json.encode(applications or {}))
    
    if new_apps_md5sum == old_apps_md5sum then
        return
    end
    
    applications = new_apps
    -- 更新缓存
    cached_applications = core.json.decode(core.json.encode(new_apps)) -- 深拷贝
    cache_valid = true
    last_successful_update = ngx.now()
    
    local ok, err = events.post(events_list._source, events_list.updating,
                                applications)
    if not ok then
        log.error("post_event failure with ", events_list._source,
                  ", update application error: ", err)
    else
        log.info("Successfully updated applications data and cache")
    end
end

local function fetch_full_registry(premature)
    if premature then
        return
    end

    local up_apps = {}
    local base_uri, username, password = get_base_uri()
    local token_param, err = get_token_param(base_uri, username, password)
    if err then
        log.error('get_token_param error:', err)
        -- 所有节点都失败时使用缓存
        if not applications then
            if use_cached_data() then
                log.warn("Using cached data for initial setup")
            else
                applications = up_apps
            end
        end
        return
    end

    local infos = get_nacos_services()
    if #infos == 0 then
        applications = up_apps
        return
    end

    local success_count = 0
    local total_services = #infos

    for _, service_info in ipairs(infos) do
        local data, err
        local namespace_id = service_info.namespace_id
        local group_name = service_info.group_name
        local scheme = service_info.scheme or ''
        local namespace_param = get_namespace_param(service_info.namespace_id)
        local group_name_param = get_group_name_param(service_info.group_name)
        local query_path = instance_list_path .. service_info.service_name
                           .. token_param .. namespace_param .. group_name_param
        data, err = get_url(base_uri, query_path)
        if err then
            log.error('get_url:', query_path, ' err:', err)
            goto CONTINUE
        end

        if not up_apps[namespace_id] then
            up_apps[namespace_id] = {}
        end

        if not up_apps[namespace_id][group_name] then
            up_apps[namespace_id][group_name] = {}
        end

        for _, host in ipairs(data.hosts) do
            local nodes = up_apps[namespace_id]
                [group_name][service_info.service_name]
            if not nodes then
                nodes = {}
                up_apps[namespace_id]
                    [group_name][service_info.service_name] = nodes
            end

            local node = {
                host = host.ip,
                port = host.port,
                weight = host.weight or default_weight,
            }

            -- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
            if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then
                node.port = host.metadata.gRPC_port
            end

            core.table.insert(nodes, node)
        end

        success_count = success_count + 1
    end

    log.info("Service discovery completed: ", success_count, "/", total_services, " services updated")
    
    if success_count > 0 then
        -- 部分或全部服务更新成功
        update_applications(up_apps)
    else
        -- 所有服务都失败了,尝试使用缓存数据
        log.error("Failed to fetch any service information from all nacos nodes")
        if use_cached_data() then
            log.warn("Successfully fallback to cached service discovery data")
        else
            log.error("No valid cache available, service discovery data may be stale or empty")
        end
    end
	::CONTINUE::
end

function _M.nodes(service_name, discovery_args)
    local namespace_id = discovery_args and
            discovery_args.namespace_id or default_namespace_id
    local group_name = discovery_args
            and discovery_args.group_name or default_group_name

    local logged = false
    -- maximum waiting time: 5 seconds
    local waiting_time = 5
    local step = 0.1
    while not applications and waiting_time > 0 do
        if not logged then
            log.warn('wait init')
            logged = true
        end
        ngx.sleep(step)
        waiting_time = waiting_time - step
    end

    if not applications or not applications[namespace_id]
        or not applications[namespace_id][group_name]
    then
        return nil
    end
    return applications[namespace_id][group_name][service_name]
end

function _M.init_worker()
    events = require("resty.worker.events")
    events_list = events.event_list("discovery_nacos_update_application",
                                    "updating")

    if 0 ~= ngx.worker.id() then
        events.register(discovery_nacos_callback, events_list._source,
                        events_list.updating)
        return
    end

    default_weight = local_conf.discovery.nacos.weight
    log.info('default_weight:', default_weight)
    local fetch_interval = local_conf.discovery.nacos.fetch_interval
    log.info('fetch_interval:', fetch_interval)
    access_key = local_conf.discovery.nacos.access_key
    secret_key = local_conf.discovery.nacos.secret_key
    
    -- 初始化健康状态
    local hosts = local_conf.discovery.nacos.host
    for _, host in ipairs(hosts) do
        host_health_status[host] = {
            healthy = true,
            failure_count = 0,
            last_check_time = ngx.now() * 1000
        }
    end
    
    -- 启动定时任务
    ngx_timer_at(0, fetch_full_registry)
    ngx_timer_every(fetch_interval, fetch_full_registry)
    
    -- 启动健康检查
    ngx_timer_every(health_check_interval / 1000, health_check_task)
    
    log.info("Nacos discovery initialized with health check and cache support")
end

function _M.dump_data()
    return {
        config = local_conf.discovery.nacos, 
        services = applications or {},
        cache = {
            cached_applications = cached_applications and true or false,
            cache_valid = cache_valid,
            last_successful_update = last_successful_update,
            is_cache_expired = is_cache_expired()
        }
    }
end

-- 获取健康状态信息(用于调试)
function _M.get_health_status()
    return host_health_status
end

-- 手动清除缓存(用于测试和调试)
function _M.clear_cache()
    cached_applications = nil
    cache_valid = false
    last_successful_update = 0
    log.info("Cache cleared manually")
end

-- 获取缓存状态
function _M.get_cache_status()
    return {
        cache_valid = cache_valid,
        last_successful_update = last_successful_update,
        is_cache_expired = is_cache_expired(),
        cached_applications = cached_applications and true or false
    }
end

return _M

wenj91 avatar Nov 21 '25 09:11 wenj91