diff --git a/docs/content.en/docs/release-notes/_index.md b/docs/content.en/docs/release-notes/_index.md index a5f29699..5013a837 100644 --- a/docs/content.en/docs/release-notes/_index.md +++ b/docs/content.en/docs/release-notes/_index.md @@ -30,7 +30,7 @@ Information about release notes of INFINI Console is provided here. - Enhance LDAP authentication logging (#156) - Optimize UI for copying metric requests (#155) - Enhance deletion tips by adding cluster info for indices -- Retain a single instance when registering duplicate endpoints (#163) +- Support clearing offline agent instances (#165) ## 1.28.2 (2025-02-15) diff --git a/docs/content.zh/docs/release-notes/_index.md b/docs/content.zh/docs/release-notes/_index.md index 5cf1c342..bb530e00 100644 --- a/docs/content.zh/docs/release-notes/_index.md +++ b/docs/content.zh/docs/release-notes/_index.md @@ -30,7 +30,6 @@ title: "版本历史" - 增强 LDAP 身份验证的日志记录 (#156) - 优化监控报表里拷贝指标请求的 UI (#155) - 删除索引提示增加集群信息 (#162) -- 自动注册实例时相同 endpoint 的实例不再重复注册 (#163) ## 1.28.2 (2025-02-15) @@ -38,6 +37,7 @@ title: "版本历史" - 告警功能支持根据桶之间文档数差值和内容差异告警 (#119) - 当使用 Easysearch 存储指标时,增加 Rollup 索引生命周期 (#128) - 增加集群指标采集模式变更事件 (#152) +- 支持清理离线 Agent 实例(#165) ### Bug fix - 修复 Insight API 处理多时间序列数据时数据丢失的问题 (#127) diff --git a/plugin/managed/server/instance.go b/plugin/managed/server/instance.go index 83218d97..0fe130b3 100644 --- a/plugin/managed/server/instance.go +++ b/plugin/managed/server/instance.go @@ -30,6 +30,9 @@ package server import ( "context" "fmt" + "infini.sh/framework/core/event" + "infini.sh/framework/core/global" + "infini.sh/framework/core/task" "net/http" "strconv" "strings" @@ -76,6 +79,8 @@ func init() { //try to connect to instance api.HandleAPIMethod(api.POST, "/instance/try_connect", handler.RequireLogin(handler.tryConnect)) + //clear instance that is not alive in 7 days + api.HandleAPIMethod(api.POST, "/instance/_clear", handler.RequirePermission(handler.clearInstance, enum.PermissionGatewayInstanceWrite)) } @@ -96,30 +101,7 @@ func (h APIHandler) registerInstance(w http.ResponseWriter, req *http.Request, p oldInst.ID = obj.ID exists, err := orm.Get(oldInst) if exists { - errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID) - h.WriteError(w, errMsg, http.StatusInternalServerError) - return - } - err, result := orm.GetBy("endpoint", obj.Endpoint, oldInst) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(result.Result) > 0 { - buf := util.MustToJSONBytes(result.Result[0]) - util.MustFromJSONBytes(buf, &oldInst) - if oldInst.ID != "" { - //keep old created time - obj.Created = oldInst.Created - log.Infof("remove old instance [%s] with the same endpoint %s", oldInst.ID, oldInst.Endpoint) - err = orm.Delete(nil, oldInst) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } + obj.Created = oldInst.Created } err = orm.Save(nil, obj) if err != nil { @@ -394,6 +376,168 @@ func (h *APIHandler) getInstanceStatus(w http.ResponseWriter, req *http.Request, } h.WriteJSON(w, result, http.StatusOK) } +func (h *APIHandler) clearInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + appName := h.GetParameterOrDefault(req, "app_name", "") + task.RunWithinGroup("clear_instance", func(ctx context.Context) error { + err := h.clearInstanceByAppName(appName) + if err != nil { + log.Error(err) + } + return err + }) + h.WriteAckOKJSON(w) +} + +func (h *APIHandler) clearInstanceByAppName(appName string) error { + var ( + size = 100 + from = 0 + ) + // Paginated query for all running instances + q := orm.Query{ + Size: size, + From: from, + } + if appName != "" { + q.Conds = orm.And( + orm.Eq("application.name", appName), + ) + } + q.AddSort("created", orm.ASC) + insts := []model.Instance{} + var ( + instanceIDs []string + toRemoveIDs []string + instsCache = map[string]*model.Instance{} + ) + client := elastic2.GetClient(global.MustLookupString(elastic2.GlobalSystemElasticsearchID)) + for { + err, _ := orm.SearchWithJSONMapper(&insts, &q) + if err != nil { + return err + } + for _, inst := range insts { + instanceIDs = append(instanceIDs, inst.ID) + instsCache[inst.ID] = &inst + } + if len(instanceIDs) == 0 { + break + } + aliveInstanceIDs, err := getAliveInstanceIDs(client, instanceIDs) + if err != nil { + return err + } + for _, instanceID := range instanceIDs { + if _, ok := aliveInstanceIDs[instanceID]; !ok { + toRemoveIDs = append(toRemoveIDs, instanceID) + } + } + if len(toRemoveIDs) > 0 { + // Use the same slice to avoid extra allocation + filteredIDs := toRemoveIDs[:0] + // check whether the instance is still online + for _, instanceID := range toRemoveIDs { + if inst, ok := instsCache[instanceID]; ok { + _, err = h.getInstanceInfo(inst.Endpoint, inst.BasicAuth) + if err == nil { + // Skip online instance, do not append to filtered list + continue + } + } + // Keep only offline instances + filteredIDs = append(filteredIDs, instanceID) + } + + // Assign back after filtering + toRemoveIDs = filteredIDs + query := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": toRemoveIDs, + }, + }, + } + // remove instances + err = orm.DeleteBy(model.Instance{}, util.MustToJSONBytes(query)) + if err != nil { + return fmt.Errorf("failed to delete instance: %w", err) + } + // remove instance related data + query = util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "metadata.labels.agent_id": toRemoveIDs, + }, + }, + } + err = orm.DeleteBy(model.Setting{}, util.MustToJSONBytes(query)) + } + + // Exit loop when the number of returned records is less than the page size + if len(insts) <= size { + break + } + // Reset instance state for the next iteration + insts = []model.Instance{} + toRemoveIDs = nil + instsCache = make(map[string]*model.Instance) + q.From += size + } + return nil +} + +func getAliveInstanceIDs(client elastic2.API, instanceIDs []string) (map[string]struct{}, error) { + query := util.MapStr{ + "size": 0, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "agent.id": instanceIDs, + }, + }, + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gt": "now-7d", + }, + }, + }, + }, + }, + }, + "aggs": util.MapStr{ + "grp_agent_id": util.MapStr{ + "terms": util.MapStr{ + "field": "agent.id", + }, + "aggs": util.MapStr{ + "count": util.MapStr{ + "value_count": util.MapStr{ + "field": "agent.id", + }, + }, + }, + }, + }, + } + queryDSL := util.MustToJSONBytes(query) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + response, err := client.QueryDSL(ctx, orm.GetWildcardIndexName(event.Event{}), nil, queryDSL) + if err != nil { + return nil, err + } + ret := map[string]struct{}{} + for _, bk := range response.Aggregations["grp_agent_id"].Buckets { + key := bk["key"].(string) + if bk["doc_count"].(float64) > 0 { + ret[key] = struct{}{} + } + } + return ret, nil +} func (h *APIHandler) proxy(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( @@ -442,7 +586,7 @@ func (h *APIHandler) getInstanceInfo(endpoint string, basicAuth *model.BasicAuth obj := &model.Instance{} _, err := ProxyAgentRequest("runtime", endpoint, req1, obj) if err != nil { - panic(err) + return nil, err } return obj, err diff --git a/web/src/locales/en-US/agent.js b/web/src/locales/en-US/agent.js index 3f1f9472..5be0aa25 100644 --- a/web/src/locales/en-US/agent.js +++ b/web/src/locales/en-US/agent.js @@ -43,4 +43,7 @@ export default { "agent.label.agent_credential": "Agent Credential", "agent.credential.tip": "No credential required", + "agent.instance.clear.title": "Clear Offline Instances", + "agent.instance.clear.modal.title": "Are you sure you want to clear offline instances?", + "agent.instance.clear.modal.desc": "This operation will delete offline instances that have not reported metrics for 7 days." }; diff --git a/web/src/locales/zh-CN/agent.js b/web/src/locales/zh-CN/agent.js index a3693b5e..a675daa1 100644 --- a/web/src/locales/zh-CN/agent.js +++ b/web/src/locales/zh-CN/agent.js @@ -40,4 +40,7 @@ export default { "agent.label.agent_credential": "代理凭据", "agent.credential.tip": "不需要凭据", + "agent.instance.clear.title": "清理离线实例", + "agent.instance.clear.modal.title": "您确定要清理离线实例?", + "agent.instance.clear.modal.desc": "该操作将会删除离线并且 7 天没有上报指标的实例" }; diff --git a/web/src/pages/Agent/Instance/index.jsx b/web/src/pages/Agent/Instance/index.jsx index 5c603c7d..d5744fe7 100644 --- a/web/src/pages/Agent/Instance/index.jsx +++ b/web/src/pages/Agent/Instance/index.jsx @@ -379,6 +379,37 @@ const AgentList = (props) => { } }; + const [clearLoading, setClearLoading] = useState(false) + const onClearClick = async ()=>{ + setClearLoading(true); + const statusRes = await request(`/instance/_clear`, { + method: "POST", + queryParams: { + "app_name": "agent", + }, + }); + if(statusRes && statusRes.acknowledged){ + message.success("submit successfully"); + } + setClearLoading(false); + } + const showClearConfirm = useCallback(() => { + Modal.confirm({ + title: formatMessage({ id: "agent.instance.clear.modal.title" }), + content: ( + <> +