diff --git a/config/system_config.tpl b/config/system_config.tpl index fb9333dc..23c4c338 100644 --- a/config/system_config.tpl +++ b/config/system_config.tpl @@ -51,11 +51,7 @@ pipeline: auto_start: true keep_running: true processor: - - metadata: - bulk_size_in_mb: 5 - bulk_max_docs_count: 5000 - fetch_max_messages: 100 - elasticsearch: "$[[CLUSTER_ID]]" + - consumer: queues: type: metadata category: elasticsearch @@ -63,15 +59,15 @@ pipeline: group: metadata when: cluster_available: ["$[[CLUSTER_ID]]"] + processor: + - metadata: + elasticsearch: "$[[CLUSTER_ID]]" + - name: activity_ingest auto_start: true keep_running: true processor: - - activity: - bulk_size_in_mb: 5 - bulk_max_docs_count: 5000 - fetch_max_messages: 100 - elasticsearch: "$[[CLUSTER_ID]]" + - consumer: queues: category: elasticsearch activity: true @@ -79,9 +75,13 @@ pipeline: group: activity when: cluster_available: ["$[[CLUSTER_ID]]"] + processor: + - activity: + elasticsearch: "$[[CLUSTER_ID]]" - name: migration_task_dispatcher auto_start: true keep_running: true + retry_delay_in_ms: 1000 processor: - migration_dispatcher: elasticsearch: "$[[CLUSTER_ID]]" diff --git a/model/insight/metric_data.go b/model/insight/metric_data.go index fcc6e37b..bad438b5 100644 --- a/model/insight/metric_data.go +++ b/model/insight/metric_data.go @@ -85,7 +85,7 @@ func (m *Metric) ValidateSortKey() error { if v, ok := mm[sortItem.Key]; !ok && !util.StringInArray([]string{"_key", "_count"}, sortItem.Key){ return fmt.Errorf("unknown sort key [%s]", sortItem.Key) }else{ - if v.Statistic == "derivative" { + if v != nil && v.Statistic == "derivative" { return fmt.Errorf("can not sort by pipeline agg [%s]", v.Statistic) } } diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index 2937f97a..bcefa402 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -25,6 +25,7 @@ func Init() { api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect)) + api.HandleAPIMethod(api.POST, "/agent/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index 9941bfc4..79bdbaf0 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -9,7 +9,9 @@ import ( "fmt" "net" "net/http" + "net/url" "strconv" + "strings" "time" log "github.com/cihub/seelog" @@ -131,6 +133,10 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps if err != nil { log.Error(err) } + _, err = refreshNodesInfo(obj) + if err != nil { + log.Error(err) + } h.WriteCreatedOKJSON(w, obj.ID) @@ -236,6 +242,7 @@ func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, } q := orm.Query{} queryDSL := util.MapStr{ + "size": len(instanceIDs), "query": util.MapStr{ "terms": util.MapStr{ "_id": instanceIDs, @@ -400,7 +407,65 @@ func (h *APIHandler) getESNodesInfo(w http.ResponseWriter, req *http.Request, ps h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - h.WriteJSON(w, nodes, http.StatusOK) + var nodeUUIDs []string + for _, node := range nodes { + if node.NodeUUID != "" { + nodeUUIDs = append(nodeUUIDs, node.NodeUUID) + } + } + if len(nodeUUIDs) == 0 { + h.WriteJSON(w, nodes, http.StatusOK) + return + } + query := util.MapStr{ + "size": len(nodeUUIDs), + "query": util.MapStr{ + "terms": util.MapStr{ + "metadata.node_id": nodeUUIDs, + }, + }, + "collapse": util.MapStr{ + "field": "metadata.node_id", + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(elastic.NodeConfig{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + idToAddresses := map[string]string{} + for _, row := range result.Result { + if rowM, ok := row.(map[string]interface{}); ok { + nodeUUID, _ := util.MapStr(rowM).GetValue("metadata.node_id") + transportAddr, _ := util.MapStr(rowM).GetValue("metadata.labels.transport_address") + if v, ok := nodeUUID.(string); ok { + idToAddresses[v] = transportAddr.(string) + } + } + } + var nNodes []tempNode + for _, node := range nodes { + nNode := tempNode{ + ESNodeInfo: node, + } + if node.NodeUUID != "" { + if addr, ok := idToAddresses[node.NodeUUID]; ok { + nNode.TransportAddress = addr + } + } + nNodes = append(nNodes, nNode) + } + + + h.WriteJSON(w, nNodes, http.StatusOK) +} +type tempNode struct { + agent.ESNodeInfo + TransportAddress string `json:"transport_address"` } func (h *APIHandler) refreshESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -558,6 +623,180 @@ func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, p h.WriteAckOKJSON(w) } +func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + reqBody := struct { + ClusterIDs []string `json:"cluster_ids"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + // query not associated nodes info + nodesM, err := getUnAssociateNodes() + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(nodesM) == 0 { + h.WriteAckOKJSON(w) + return + } + agentIds := make([]string, 0, len(nodesM)) + for agentID := range nodesM { + agentIds = append(agentIds, agentID) + } + agents, err := getAgentByIds(agentIds) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for _, clusterID := range reqBody.ClusterIDs { + // query cluster basicauth + cfg := elastic.GetConfig(clusterID) + basicAuth, err := common.GetBasicAuth(cfg) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + taskSetting, err := getSettingsByClusterID(cfg.ID) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for agentID, nodes := range nodesM { + var ( + inst *agent.Instance + ok bool + ) + if inst, ok = agents[agentID]; !ok { + log.Warnf("agent [%v] was not found", agentID) + continue + } + settings, err := common2.GetAgentSettings(agentID, 0) + if err != nil { + log.Error(err) + continue + } + for _, node := range nodes { + host := node.PublishAddress + var endpoint string + if strings.HasPrefix(host, "::") { + instURL, err := url.Parse(inst.Endpoint) + if err != nil { + log.Error(err) + continue + } + host = instURL.Hostname() + endpoint = fmt.Sprintf("%s://%s:%s", node.Schema, host, node.HttpPort) + } else { + endpoint = fmt.Sprintf("%s://%s", node.Schema, host) + } + escfg := elastic.ElasticsearchConfig{ + Endpoint: endpoint, + BasicAuth: &basicAuth, + } + nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), escfg) + if err != nil { + log.Warn(err) + continue + } + //matched + if nodeInfo.ClusterUuid == cfg.ClusterUUID { + //update node info + nodeInfo.ID = node.ID + nodeInfo.AgentID = inst.ID + nodeInfo.ClusterID = cfg.ID + err = orm.Save(nil, nodeInfo) + if err != nil { + log.Error(err) + continue + } + setting := pickAgentSettings(settings, node) + if setting == nil { + tsetting := model.TaskSetting{ + NodeStats: &model.NodeStatsTask{ + Enabled: true, + }, + Logs: &model.LogsTask{ + Enabled: true, + LogsPath: nodeInfo.Path.Logs, + }, + } + if taskSetting.IndexStats != nil { + tsetting.IndexStats = taskSetting.IndexStats + taskSetting.IndexStats = nil + } + if taskSetting.ClusterHealth != nil { + tsetting.ClusterHealth = taskSetting.ClusterHealth + taskSetting.ClusterHealth = nil + } + if taskSetting.ClusterStats != nil { + tsetting.ClusterStats = taskSetting.ClusterStats + taskSetting.ClusterStats = nil + } + setting = &agent.Setting{ + Metadata: agent.SettingsMetadata{ + Category: "agent", + Name: "task", + Labels: util.MapStr{ + "agent_id": agentID, + "cluster_uuid": nodeInfo.ClusterUuid, + "cluster_id": nodeInfo.ClusterID, + "node_uuid": nodeInfo.NodeUUID, + "endpoint": fmt.Sprintf("%s://%s", nodeInfo.Schema, nodeInfo.PublishAddress), + }, + }, + Payload: util.MapStr{ + "task": tsetting, + }, + } + err = orm.Create(nil, setting) + if err != nil { + log.Error("save agent task setting error: ", err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + } + } + + } + } + h.WriteAckOKJSON(w) +} + +func getAgentByIds(agentIDs []string)(map[string]*agent.Instance, error){ + query := util.MapStr{ + "size": len(agentIDs), + "query": util.MapStr{ + "terms": util.MapStr{ + "id": agentIDs, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(agent.Instance{}, &q) + if err != nil { + return nil, err + } + agents := map[string]*agent.Instance{} + for _, row := range result.Result { + inst := agent.Instance{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &inst) + agents[inst.ID] = &inst + } + return agents, nil +} + func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("instance_id") nodeIDs := []string{} @@ -658,7 +897,7 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { return nil, err } for _, node := range nodesInfo { - oldNode := getNodeByPidOrUUID(oldNodesInfo, node.ProcessInfo.PID, node.NodeUUID) + oldNode := getNodeByPidOrUUID(oldNodesInfo, node.ProcessInfo.PID, node.NodeUUID, node.HttpPort) node.AgentID = inst.ID if oldNode != nil { node.ID = oldNode.ID @@ -705,7 +944,7 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { return resultNodes, nil } -func getNodeByPidOrUUID(nodes map[int]*agent.ESNodeInfo, pid int, uuid string) *agent.ESNodeInfo { +func getNodeByPidOrUUID(nodes map[int]*agent.ESNodeInfo, pid int, uuid string, port string) *agent.ESNodeInfo { if nodes[pid] != nil { return nodes[pid] } @@ -719,7 +958,7 @@ func getNodeByPidOrUUID(nodes map[int]*agent.ESNodeInfo, pid int, uuid string) * func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error) { query := util.MapStr{ - "size": 100, + "size": 1000, "query": util.MapStr{ "term": util.MapStr{ "agent_id": util.MapStr{ @@ -746,6 +985,39 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error) { return nodesInfo, nil } +func getUnAssociateNodes() (map[string][]agent.ESNodeInfo, error){ + query := util.MapStr{ + "size": 3000, + "query": util.MapStr{ + "bool": util.MapStr{ + "must_not": []util.MapStr{ + { + "exists": util.MapStr{ + "field": "cluster_id", + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + + err, result := orm.Search(agent.ESNodeInfo{}, &q) + if err != nil { + return nil, err + } + nodesInfo := map[string][]agent.ESNodeInfo{} + for _, row := range result.Result { + node := agent.ESNodeInfo{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &node) + nodesInfo[node.AgentID] = append(nodesInfo[node.AgentID], node) + } + return nodesInfo, nil +} + func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting { for _, setting := range settings { if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { @@ -784,49 +1056,7 @@ func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting, // getSettingsByClusterID query agent task settings with cluster id func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) { - queryDsl := util.MapStr{ - "size": 200, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.cluster_id": util.MapStr{ - "value": clusterID, - }, - }, - }, - }, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "payload.task.cluster_health": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "payload.task.cluster_stats": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "payload.task.index_stats": util.MapStr{ - "value": true, - }, - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - err, result := orm.Search(agent.Setting{}, &q) + err, result := querySettingsByClusterID(clusterID) if err != nil { return nil, err } @@ -841,7 +1071,7 @@ func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) { indexStats = true clusterHealth = true ) - keys := []string{"payload.task.cluster_stats", "payload.task.cluster_health", "payload.task.index_stats"} + keys := []string{"payload.task.cluster_stats.enabled", "payload.task.cluster_health.enabled", "payload.task.index_stats.enabled"} for _, row := range result.Result { if v, ok := row.(map[string]interface{}); ok { vm := util.MapStr(v) @@ -849,11 +1079,11 @@ func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) { tv, _ := vm.GetValue(key) if tv == true { switch key { - case "payload.task.cluster_stats": + case "payload.task.cluster_stats.enabled": clusterStats = false - case "payload.task.index_stats": + case "payload.task.index_stats.enabled": indexStats = false - case "payload.task.cluster_health": + case "payload.task.cluster_health.enabled": clusterHealth = false } } @@ -877,3 +1107,50 @@ func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) { } return setting, nil } + +func querySettingsByClusterID(clusterID string)(error, orm.Result){ + queryDsl := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.cluster_id": util.MapStr{ + "value": clusterID, + }, + }, + }, + }, + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "payload.task.cluster_health.enabled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "payload.task.cluster_stats.enabled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "payload.task.index_stats.enabled": util.MapStr{ + "value": true, + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), + } + return orm.Search(agent.Setting{}, &q) +} \ No newline at end of file diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go index a6d26216..48b12429 100644 --- a/modules/agent/common/helper.go +++ b/modules/agent/common/helper.go @@ -32,21 +32,23 @@ func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResul clusterID string ok bool ) + nodeUUID := util.ToString(setting.Metadata.Labels["node_uuid"]) if clusterID, ok = setting.Metadata.Labels["cluster_id"].(string); ok && clusterID != ""{ cfg := elastic.GetConfig(clusterID) + newID := getClusterConfigReferenceName(clusterID, nodeUUID) newCfg := elastic.ElasticsearchConfig{ Enabled: true, - Name: cfg.Name, + Name: newID, BasicAuth: cfg.BasicAuth, //todo get endpoint from agent node info Endpoint: setting.Metadata.Labels["endpoint"].(string), + ClusterUUID: cfg.ClusterUUID, } - newCfg.ID = clusterID + newCfg.ID = newID clusterCfgs = append(clusterCfgs, newCfg) }else{ return nil, fmt.Errorf("got wrong cluster id [%v] from metadata labels", setting.Metadata.Labels["cluster_id"]) } - nodeUUID := util.ToString(setting.Metadata.Labels["node_uuid"]) taskCfg, err := util.MapStr(setting.Payload).GetValue("task") if err != nil { @@ -152,6 +154,10 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) return settings, nil } +func getClusterConfigReferenceName(clusterID, nodeUUID string) string { + return fmt.Sprintf("%s_%s", clusterID, nodeUUID) +} + func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { if setting == nil { return nil, nil, fmt.Errorf("empty setting") @@ -163,7 +169,7 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s if setting.ClusterStats != nil { var processorName = "es_cluster_stats" if setting.ClusterStats.Enabled { - pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) + pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID, nodeUUID) if err != nil { return nil, nil, err } @@ -175,7 +181,7 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s if setting.IndexStats != nil { var processorName = "es_index_stats" if setting.IndexStats.Enabled { - pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) + pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID, nodeUUID) if err != nil { return nil, nil, err } @@ -187,7 +193,7 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s if setting.ClusterHealth != nil { var processorName = "es_cluster_health" if setting.ClusterHealth.Enabled { - pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) + pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID, nodeUUID) if err != nil { return nil, nil, err } @@ -200,7 +206,10 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s var processorName = "es_node_stats" if setting.NodeStats.Enabled { params := util.MapStr{ - "elasticsearch": clusterID, + "elasticsearch": getClusterConfigReferenceName(clusterID, nodeUUID), + "labels": util.MapStr{ + "cluster_id": clusterID, + }, } if len(setting.NodeStats.NodeIDs) > 0{ params["node_uuids"] = setting.NodeStats.NodeIDs @@ -226,8 +235,11 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s var processorName = "es_logs_processor" if setting.Logs.Enabled { params := util.MapStr{ - "elasticsearch": clusterID, + "elasticsearch": getClusterConfigReferenceName(clusterID, nodeUUID), "queue_name": "logs", + "labels": util.MapStr{ + "cluster_id": clusterID, + }, } if setting.Logs.LogsPath != "" { params["logs_path"] = setting.Logs.LogsPath @@ -251,10 +263,14 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s } -func newClusterMetricPipeline(processorName string, clusterID string)(util.MapStr, error){ +func newClusterMetricPipeline(processorName string, clusterID string, nodeUUID string)(util.MapStr, error){ + referName := getClusterConfigReferenceName(clusterID, nodeUUID) cfg := util.MapStr{ processorName: util.MapStr{ - "elasticsearch": clusterID, + "elasticsearch": referName, + "labels": util.MapStr{ + "cluster_id": clusterID, + }, }, } enabled := true @@ -357,6 +373,9 @@ func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]str }, }, } + if len(agentIds) == 0 { + queryDSL["size"] = 2000 + } q.RawQuery = util.MustToJSONBytes(queryDSL) err, result := orm.Search(event.Event{}, &q) if err != nil { diff --git a/modules/agent/state/state.go b/modules/agent/state/state.go index 7b65effd..b833f956 100644 --- a/modules/agent/state/state.go +++ b/modules/agent/state/state.go @@ -109,7 +109,9 @@ func (sm *StateManager) checkAgentStatus() { return } // status change to online + sm.agentMutex.Lock() sm.agentIds[agentID] = model.StatusOnline + sm.agentMutex.Unlock() log.Infof("status of agent [%s] changed to online", agentID) return }else{ @@ -119,7 +121,9 @@ func (sm *StateManager) checkAgentStatus() { } } // status change to offline + sm.agentMutex.Lock() sm.agentIds[agentID] = model.StatusOffline + sm.agentMutex.Unlock() ag, err := sm.GetAgent(agentID) if err != nil { if err != elastic.ErrNotFound { @@ -190,14 +194,18 @@ func (sm *StateManager) syncSettings(agentID string) { "endpoint": cfg.Endpoint, } if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{ - err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cfg.ID), cfg.BasicAuth.Password) + cid := cfg.ID + if cfg.ClusterUUID != "" { + cid = cfg.ClusterUUID + } + err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cid), cfg.BasicAuth.Password) if err != nil { log.Errorf("set keystore value error: %v", err) continue } clusterCfg["basic_auth"] = util.MapStr{ "username": cfg.BasicAuth.Username, - "password": fmt.Sprintf("$[[keystore.%s_password]]", cfg.ID), + "password": fmt.Sprintf("$[[keystore.%s_password]]", cid), } } clusterCfgs = append(clusterCfgs, clusterCfg) diff --git a/plugin/elastic/activity.go b/plugin/elastic/activity.go index ba27abf4..8ae0f6d0 100644 --- a/plugin/elastic/activity.go +++ b/plugin/elastic/activity.go @@ -11,26 +11,17 @@ import ( "github.com/segmentio/encoding/json" "infini.sh/framework/core/config" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/errors" "infini.sh/framework/core/event" - "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/pipeline" "infini.sh/framework/core/queue" - "infini.sh/framework/core/rotate" "infini.sh/framework/core/util" - "runtime" - - "sync" - "time" ) type ActivityProcessor struct { - config *Config - runningConfigs map[string]*queue.QueueConfig - wg sync.WaitGroup - inFlightQueueConfigs sync.Map - detectorRunning bool - id string + config *Config + id string } func init() { @@ -39,44 +30,21 @@ func init() { func NewActivityProcessor(c *config.Config) (pipeline.Processor, error) { cfg := Config{ - NumOfWorkers: 1, - MaxWorkers: 10, - MaxConnectionPerHost: 1, - IdleTimeoutInSecond: 5, - DetectIntervalInMs: 10000, - Queues: map[string]interface{}{}, - - Consumer: queue.ConsumerConfig{ - Group: "activity-001", - Name: "activity-001", - FetchMinBytes: 1, - FetchMaxBytes: 10 * 1024 * 1024, - FetchMaxMessages: 500, - EOFRetryDelayInMs: 1000, - FetchMaxWaitMs: 10000, - }, - - DetectActiveQueue: true, - ValidateRequest: false, - SkipEmptyQueue: true, - SkipOnMissingInfo: false, - RotateConfig: rotate.DefaultConfig, - BulkConfig: elastic.DefaultBulkProcessorConfig, + MessageField: "messages", } if err := c.Unpack(&cfg); err != nil { - log.Error(err) return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err) } - runner := ActivityProcessor{ - id: util.GetUUID(), - config: &cfg, - runningConfigs: map[string]*queue.QueueConfig{}, - inFlightQueueConfigs: sync.Map{}, + if cfg.Elasticsearch == "" { + return nil, errors.New("elasticsearch config was not found in metadata processor") } - runner.wg = sync.WaitGroup{} + runner := ActivityProcessor{ + id: util.GetUUID(), + config: &cfg, + } return &runner, nil } @@ -85,238 +53,32 @@ func (processor *ActivityProcessor) Name() string { return "activity" } -func (processor *ActivityProcessor) Process(c *pipeline.Context) error { - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Error("error in activity processor,", v) +func (processor *ActivityProcessor) Process(ctx *pipeline.Context) error { + //get message from queue + obj := ctx.Get(processor.config.MessageField) + if obj != nil { + messages := obj.([]queue.Message) + log.Tracef("get %v messages from context", len(messages)) + if len(messages) == 0 { + return nil + } + for _, pop := range messages { + typ, err := jsonparser.GetString(pop.Data, "metadata", "name") + if err != nil { + panic(err) } - } - log.Trace("exit activity processor") - }() - - //handle updates - if processor.config.DetectActiveQueue { - log.Tracef("detector running [%v]", processor.detectorRunning) - if !processor.detectorRunning { - processor.detectorRunning = true - processor.wg.Add(1) - go func(c *pipeline.Context) { - log.Tracef("init detector for active queue [%v] ", processor.id) - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Error("error in activity processor,", v) - } - } - processor.detectorRunning = false - log.Debug("exit detector for active queue") - processor.wg.Done() - }() - - for { - if c.IsCanceled() { - return - } - - if global.Env().IsDebug { - log.Tracef("inflight queues: %v", util.MapLength(&processor.inFlightQueueConfigs)) - processor.inFlightQueueConfigs.Range(func(key, value interface{}) bool { - log.Tracef("inflight queue:%v", key) - return true - }) - } - - cfgs := queue.GetConfigByLabels(processor.config.Queues) - for _, v := range cfgs { - if c.IsCanceled() { - return - } - //if have depth and not in in flight - if queue.HasLag(v) { - _, ok := processor.inFlightQueueConfigs.Load(v.Id) - if !ok { - log.Tracef("detecting new queue: %v", v.Name) - processor.HandleQueueConfig(v, c) - } - } - } - if processor.config.DetectIntervalInMs > 0 { - time.Sleep(time.Millisecond * time.Duration(processor.config.DetectIntervalInMs)) - } - } - }(c) - } - } else { - cfgs := queue.GetConfigByLabels(processor.config.Queues) - log.Debugf("filter queue by:%v, num of queues:%v", processor.config.Queues, len(cfgs)) - for _, v := range cfgs { - log.Tracef("checking queue: %v", v) - processor.HandleQueueConfig(v, c) - } - } - - processor.wg.Wait() - - return nil -} - -func (processor *ActivityProcessor) HandleQueueConfig(v *queue.QueueConfig, c *pipeline.Context) { - - if processor.config.SkipEmptyQueue { - if !queue.HasLag(v) { - if global.Env().IsDebug { - log.Tracef("skip empty queue:[%v]", v.Name) - } - return - } - } - - elasticsearch := processor.config.Elasticsearch - if elasticsearch == "" { - log.Error("elasticsearch config was not found in activity processor") - return - } - - meta := elastic.GetMetadata(util.ToString(elasticsearch)) - if meta == nil { - log.Debugf("metadata for [%v] is nil", elasticsearch) - return - } - - host := meta.GetActiveHost() - log.Debugf("random choose node [%v] to consume queue [%v]", host, v.Id) - processor.wg.Add(1) - - //go processor.NewBulkWorker("bulk_indexing_"+host,c, processor.bulkSizeInByte, v, host) - go processor.HandleMessage(c, v) - -} - -func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig *queue.QueueConfig) { - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Errorf("error in %s processor: %v", processor.Name(), v) - } - } - processor.wg.Done() - log.Tracef("exit %s processor", processor.Name()) - }() - - key := qConfig.Id - - if processor.config.MaxWorkers > 0 && util.MapLength(&processor.inFlightQueueConfigs) > processor.config.MaxWorkers { - log.Debugf("reached max num of workers, skip init [%v]", qConfig.Name) - return - } - - var workerID = util.GetUUID() - _, exists := processor.inFlightQueueConfigs.Load(key) - if exists { - log.Errorf("queue [%v] has more then one consumer", qConfig.Id) - return - } - - processor.inFlightQueueConfigs.Store(key, workerID) - log.Debugf("starting worker:[%v], queue:[%v]", workerID, qConfig.Name) - var consumer = queue.GetOrInitConsumerConfig(qConfig.Id, processor.config.Consumer.Group, processor.config.Consumer.Name) - initOffset, _ := queue.GetOffset(qConfig, consumer) - offset := initOffset - defer func() { - log.Debugf("worker:[%v] start consume queue:[%v] offset:%v", workerID, qConfig.Id, offset) - }() - - for { - if ctx.IsCanceled() { - return - } - - ctx1, messages, timeout, err := queue.Consume(qConfig, consumer, offset) - - if len(messages)==0{ - time.Sleep(time.Millisecond * time.Duration(500)) - } - - if timeout { - log.Tracef("timeout on queue:[%v]", qConfig.Name) - ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name)) - return - } - - if err != nil { - log.Tracef("error on queue:[%v]", qConfig.Name) - if err.Error() == "EOF" { - if len(messages) > 0 { - goto HANDLE_MESSAGE - } - return - } - panic(err) - } - - HANDLE_MESSAGE: - - //update temp offset, not committed, continued reading - offset = ctx1.NextOffset.String()//TODO - - if len(messages) > 0 { - for _, pop := range messages { - typ, err := jsonparser.GetString(pop.Data, "metadata", "name") + switch typ { + case "activity": + activity, _, _, err := jsonparser.Get(pop.Data, "payload", "activity") if err != nil { panic(err) } - switch typ { - case "activity": - activity, _, _, err := jsonparser.Get(pop.Data, "payload", "activity") - if err != nil { - panic(err) - } - - err = processor.HandleActivity(activity) - } + err = processor.HandleActivity(activity) } } - if err == nil { - if offset != "" && initOffset != offset { - ok, err := queue.CommitOffset(qConfig, consumer, offset) - if !ok || err != nil { - panic(err) - } - initOffset=offset - } - } else { - log.Error(err) - } } + return nil } func (processor *ActivityProcessor) HandleActivity(activityByte []byte) error { diff --git a/plugin/elastic/metadata.go b/plugin/elastic/metadata.go index 4063e93d..68915d6f 100644 --- a/plugin/elastic/metadata.go +++ b/plugin/elastic/metadata.go @@ -10,53 +10,23 @@ import ( log "github.com/cihub/seelog" "infini.sh/framework/core/config" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/errors" "infini.sh/framework/core/event" - "infini.sh/framework/core/global" "infini.sh/framework/core/orm" + "infini.sh/framework/core/param" "infini.sh/framework/core/pipeline" "infini.sh/framework/core/queue" - "infini.sh/framework/core/rotate" "infini.sh/framework/core/util" - "runtime" - - "sync" - "time" ) type MetadataProcessor struct { - config *Config - runningConfigs map[string]*queue.QueueConfig - wg sync.WaitGroup - inFlightQueueConfigs sync.Map - detectorRunning bool - id string + config *Config + id string } type Config struct { - NumOfWorkers int `config:"worker_size"` - - IdleTimeoutInSecond int `config:"idle_timeout_in_seconds"` - MaxConnectionPerHost int `config:"max_connection_per_node"` - - Queues map[string]interface{} `config:"queues,omitempty"` - - Consumer queue.ConsumerConfig `config:"consumer"` - - MaxWorkers int `config:"max_worker_size"` - - DetectActiveQueue bool `config:"detect_active_queue"` - DetectIntervalInMs int `config:"detect_interval"` - - ValidateRequest bool `config:"valid_request"` - SkipEmptyQueue bool `config:"skip_empty_queue"` - SkipOnMissingInfo bool `config:"skip_info_missing"` - - RotateConfig rotate.RotateConfig `config:"rotate"` - BulkConfig elastic.BulkProcessorConfig `config:"bulk"` - - Elasticsearch string `config:"elasticsearch,omitempty"` - - WaitingAfter []string `config:"waiting_after"` + MessageField param.ParaKey `config:"message_field"` + Elasticsearch string `config:"elasticsearch,omitempty"` } func init() { @@ -65,45 +35,21 @@ func init() { func New(c *config.Config) (pipeline.Processor, error) { cfg := Config{ - NumOfWorkers: 1, - MaxWorkers: 10, - MaxConnectionPerHost: 1, - IdleTimeoutInSecond: 5, - DetectIntervalInMs: 10000, - Queues: map[string]interface{}{}, - - Consumer: queue.ConsumerConfig{ - Group: "metadata-001", - Name: "metadata-001", - FetchMinBytes: 1, - FetchMaxBytes: 10 * 1024 * 1024, - FetchMaxMessages: 500, - EOFRetryDelayInMs: 1000, - FetchMaxWaitMs: 10000, - }, - - DetectActiveQueue: true, - ValidateRequest: false, - SkipEmptyQueue: true, - SkipOnMissingInfo: false, - RotateConfig: rotate.DefaultConfig, - BulkConfig: elastic.DefaultBulkProcessorConfig, + MessageField: "messages", } if err := c.Unpack(&cfg); err != nil { - log.Error(err) return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err) } - runner := MetadataProcessor{ - id: util.GetUUID(), - config: &cfg, - runningConfigs: map[string]*queue.QueueConfig{}, - inFlightQueueConfigs: sync.Map{}, + if cfg.Elasticsearch == "" { + return nil, errors.New("elasticsearch config was not found in metadata processor") } - runner.wg = sync.WaitGroup{} - + runner := MetadataProcessor{ + id: util.GetUUID(), + config: &cfg, + } return &runner, nil } @@ -111,247 +57,38 @@ func (processor *MetadataProcessor) Name() string { return "metadata" } -func (processor *MetadataProcessor) Process(c *pipeline.Context) error { - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Error("error in metadata processor,", v) +func (processor *MetadataProcessor) Process(ctx *pipeline.Context) error { + + //get message from queue + obj := ctx.Get(processor.config.MessageField) + if obj != nil { + messages := obj.([]queue.Message) + log.Tracef("get %v messages from context", len(messages)) + if len(messages) == 0 { + return nil + } + for _, pop := range messages { + typ, err := jsonparser.GetString(pop.Data, "metadata", "name") + if err != nil { + panic(err) } - } - log.Trace("exit metadata processor") - }() - - //handle updates - if processor.config.DetectActiveQueue { - log.Tracef("detector running [%v]", processor.detectorRunning) - if !processor.detectorRunning { - processor.detectorRunning = true - processor.wg.Add(1) - go func(c *pipeline.Context) { - log.Tracef("init detector for active queue [%v] ", processor.id) - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Error("error in metadata processor,", v) - } - } - processor.detectorRunning = false - log.Debug("exit detector for active queue") - processor.wg.Done() - }() - - for { - if c.IsCanceled() { - return - } - - if global.Env().IsDebug { - log.Tracef("inflight queues: %v", util.MapLength(&processor.inFlightQueueConfigs)) - processor.inFlightQueueConfigs.Range(func(key, value interface{}) bool { - log.Tracef("inflight queue:%v", key) - return true - }) - } - - cfgs := queue.GetConfigByLabels(processor.config.Queues) - for _, v := range cfgs { - if c.IsCanceled() { - return - } - //if have depth and not in in flight - if queue.HasLag(v) { - _, ok := processor.inFlightQueueConfigs.Load(v.Id) - if !ok { - log.Tracef("detecting new queue: %v", v.Name) - processor.HandleQueueConfig(v, c) - } - } - } - if processor.config.DetectIntervalInMs > 0 { - time.Sleep(time.Millisecond * time.Duration(processor.config.DetectIntervalInMs)) - } - } - }(c) - } - } else { - cfgs := queue.GetConfigByLabels(processor.config.Queues) - log.Debugf("filter queue by:%v, num of queues:%v", processor.config.Queues, len(cfgs)) - for _, v := range cfgs { - log.Tracef("checking queue: %v", v) - processor.HandleQueueConfig(v, c) - } - } - - processor.wg.Wait() - - return nil -} - -func (processor *MetadataProcessor) HandleQueueConfig(v *queue.QueueConfig, c *pipeline.Context) { - - if processor.config.SkipEmptyQueue { - if !queue.HasLag(v) { - if global.Env().IsDebug { - log.Tracef("skip empty queue:[%v]", v.Name) - } - return - } - } - - elasticsearch := processor.config.Elasticsearch - if elasticsearch == "" { - log.Error("elasticsearch config was not found in metadata processor") - return - } - - meta := elastic.GetMetadata(util.ToString(elasticsearch)) - if meta == nil { - log.Debugf("metadata for [%v] is nil", elasticsearch) - return - } - - host := meta.GetActiveHost() - log.Debugf("random choose node [%v] to consume queue [%v]", host, v.Id) - processor.wg.Add(1) - - //go processor.NewBulkWorker("bulk_indexing_"+host,c, processor.bulkSizeInByte, v, host) - go processor.HandleMessage(c, v) - -} - -func (processor *MetadataProcessor) HandleMessage(ctx *pipeline.Context, qConfig *queue.QueueConfig) { - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Errorf("error in %s processor: %v", processor.Name(), v) - } - } - processor.wg.Done() - log.Tracef("exit %s processor", processor.Name()) - }() - - key := qConfig.Id - - if processor.config.MaxWorkers > 0 && util.MapLength(&processor.inFlightQueueConfigs) > processor.config.MaxWorkers { - log.Debugf("reached max num of workers, skip init [%v]", qConfig.Name) - return - } - - var workerID = util.GetUUID() - _, exists := processor.inFlightQueueConfigs.Load(key) - if exists { - log.Errorf("queue [%v] has more then one consumer", qConfig.Id) - return - } - - processor.inFlightQueueConfigs.Store(key, workerID) - log.Debugf("starting worker:[%v], queue:[%v]", workerID, qConfig.Name) - var consumer = queue.GetOrInitConsumerConfig(qConfig.Id, processor.config.Consumer.Group, processor.config.Consumer.Name) - initOffset, _ := queue.GetOffset(qConfig, consumer) - offset := initOffset - defer func() { - log.Debugf("worker:[%v] start consume queue:[%v] offset:%v", workerID, qConfig.Id, offset) - }() - - for { - if ctx.IsCanceled() { - return - } - ctx1, messages, isTimeout, err := queue.Consume(qConfig, consumer, offset) - - if len(messages)==0{ - time.Sleep(time.Millisecond * time.Duration(500)) - } - - //if timeout{ - // log.Tracef("timeout on queue:[%v]",qConfig.Name) - // ctx.Failed() - // return - //} - - if err != nil { - log.Tracef("error on queue:[%v]", qConfig.Name) - if err.Error() == "EOF" { - if len(messages) > 0 { - goto HANDLE_MESSAGE - } - return - } - //panic(err) - if isTimeout { - time.Sleep(time.Millisecond * 1000) - } - } - - HANDLE_MESSAGE: - - //update temp offset, not committed, continued reading - offset = ctx1.NextOffset.String()//TODO - - if len(messages) > 0 { - for _, pop := range messages { - - typ, err := jsonparser.GetString(pop.Data, "metadata", "name") + switch typ { + case "index_health_change": + //err = processor.HandleIndexHealthChange(&ev) + case "index_state_change": + indexState, _, _, err := jsonparser.Get(pop.Data, "payload", "index_state") if err != nil { panic(err) } - switch typ { - case "index_health_change": - //err = processor.HandleIndexHealthChange(&ev) - case "index_state_change": - indexState, _, _, err := jsonparser.Get(pop.Data, "payload", "index_state") - if err != nil { - panic(err) - } - err = processor.HandleIndexStateChange(indexState) - case "unknown_node_status": - processor.HandleUnknownNodeStatus(pop.Data) - } - - } - } - if err == nil { - if offset != "" && initOffset != offset { - ok, err := queue.CommitOffset(qConfig, consumer, offset) - if !ok || err != nil { - panic(err) - } - initOffset=offset - } - } else { - if !isTimeout { - log.Error(err) + err = processor.HandleIndexStateChange(indexState) + case "unknown_node_status": + processor.HandleUnknownNodeStatus(pop.Data) } } } + return nil } + func (processor *MetadataProcessor) HandleIndexStateChange(indexState []byte) error { esClient := elastic.GetClient(processor.config.Elasticsearch) // save index metadata diff --git a/plugin/task_manager/migration_api.go b/plugin/task_manager/migration_api.go index 44f7973d..f2898818 100644 --- a/plugin/task_manager/migration_api.go +++ b/plugin/task_manager/migration_api.go @@ -76,11 +76,17 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R for i, index := range taskConfig.Indices { indexName := index.Source.GetUniqueIndexName() count := indexState[indexName].IndexDocs - percent := float64(count) / float64(index.Source.Docs) * 100 - if percent > 100 { + sourceDocs := indexState[indexName].SourceDocs + var percent float64 + if sourceDocs <= 0 { percent = 100 + }else{ + percent = float64(count) / float64(sourceDocs) * 100 + if percent > 100 { + percent = 100 + } } - taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceDocs + taskConfig.Indices[i].Source.Docs = sourceDocs taskConfig.Indices[i].Target.Docs = count taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions diff --git a/plugin/task_manager/model/common.go b/plugin/task_manager/model/common.go index b5d2a517..4bc8512e 100644 --- a/plugin/task_manager/model/common.go +++ b/plugin/task_manager/model/common.go @@ -2,6 +2,7 @@ package model import ( "fmt" + "infini.sh/framework/core/task" "time" "infini.sh/framework/core/util" @@ -82,3 +83,9 @@ func (incremental *IndexIncremental) BuildFilter(current int64, step time.Durati }, }, nil } + +type QueryTask struct { + Type string + Status []string + TaskHandler func(taskItem *task.Task) error +} \ No newline at end of file diff --git a/plugin/task_manager/pipeline.go b/plugin/task_manager/pipeline.go index e5652eac..afd4a65d 100644 --- a/plugin/task_manager/pipeline.go +++ b/plugin/task_manager/pipeline.go @@ -40,6 +40,7 @@ type DispatcherProcessor struct { indexMigrationTaskProcessor migration_model.Processor clusterComparisonTaskProcessor migration_model.Processor indexComparisonTaskProcessor migration_model.Processor + queryTasks []migration_model.QueryTask } type DispatcherConfig struct { @@ -101,6 +102,25 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro processor.clusterMigrationTaskProcessor = cluster_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) processor.indexComparisonTaskProcessor = index_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) processor.clusterComparisonTaskProcessor = cluster_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) + processor.queryTasks = []migration_model.QueryTask{ + // handle pipeline task + {"pipeline", []string{task.StatusReady, task.StatusRunning, task.StatusPendingStop}, processor.pipelineTaskProcessor.Process}, + // handle comparison tasks + {"cluster_comparison", []string{task.StatusPendingStop}, processor.clusterComparisonTaskProcessor.Process}, + {"index_comparison", []string{task.StatusPendingStop}, processor.indexComparisonTaskProcessor.Process}, + {"index_comparison", []string{task.StatusPendingStop}, processor.indexComparisonTaskProcessor.Process}, + {"index_comparison", []string{task.StatusRunning}, processor.indexComparisonTaskProcessor.Process}, + {"index_comparison", []string{task.StatusReady}, processor.indexComparisonTaskProcessor.Process}, + {"cluster_comparison", []string{task.StatusRunning}, processor.clusterComparisonTaskProcessor.Process}, + {"cluster_comparison", []string{task.StatusReady}, processor.clusterComparisonTaskProcessor.Process}, + // handle migration tasks + {"cluster_migration", []string{task.StatusPendingStop}, processor.clusterMigrationTaskProcessor.Process}, + {"index_migration", []string{task.StatusPendingStop}, processor.indexMigrationTaskProcessor.Process}, + {"index_migration", []string{task.StatusRunning}, processor.indexMigrationTaskProcessor.Process}, + {"index_migration", []string{task.StatusReady}, processor.indexMigrationTaskProcessor.Process}, + {"cluster_migration", []string{task.StatusRunning}, processor.clusterMigrationTaskProcessor.Process}, + {"cluster_migration", []string{task.StatusReady}, processor.clusterMigrationTaskProcessor.Process}, + } return &processor, nil } @@ -109,40 +129,38 @@ func (p *DispatcherProcessor) Name() string { return "migration_dispatcher" } -func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { - // handle repeating tasks - p.handleRepeatingTasks(ctx, "cluster_comparison") - p.handleRepeatingTasks(ctx, "cluster_migration") +var ( + repeatingTaskTypes = []string{"cluster_comparison", "cluster_migration"} +) - // handle pipeline task - p.handleTasks(ctx, "pipeline", []string{task.StatusReady, task.StatusRunning, task.StatusPendingStop}, p.pipelineTaskProcessor.Process) - - // handle comparison tasks - p.handleTasks(ctx, "cluster_comparison", []string{task.StatusPendingStop}, p.clusterComparisonTaskProcessor.Process) - p.handleTasks(ctx, "index_comparison", []string{task.StatusPendingStop}, p.indexComparisonTaskProcessor.Process) - p.handleTasks(ctx, "index_comparison", []string{task.StatusRunning}, p.indexComparisonTaskProcessor.Process) - p.handleTasks(ctx, "index_comparison", []string{task.StatusReady}, p.indexComparisonTaskProcessor.Process) - p.handleTasks(ctx, "cluster_comparison", []string{task.StatusRunning}, p.clusterComparisonTaskProcessor.Process) - p.handleTasks(ctx, "cluster_comparison", []string{task.StatusReady}, p.clusterComparisonTaskProcessor.Process) - - // handle migration tasks - p.handleTasks(ctx, "cluster_migration", []string{task.StatusPendingStop}, p.clusterMigrationTaskProcessor.Process) - p.handleTasks(ctx, "index_migration", []string{task.StatusPendingStop}, p.indexMigrationTaskProcessor.Process) - p.handleTasks(ctx, "index_migration", []string{task.StatusRunning}, p.indexMigrationTaskProcessor.Process) - p.handleTasks(ctx, "index_migration", []string{task.StatusReady}, p.indexMigrationTaskProcessor.Process) - p.handleTasks(ctx, "cluster_migration", []string{task.StatusRunning}, p.clusterMigrationTaskProcessor.Process) - p.handleTasks(ctx, "cluster_migration", []string{task.StatusReady}, p.clusterMigrationTaskProcessor.Process) +func (p *DispatcherProcessor) getTasks() error { return nil } -func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task.Task) error) { +func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { + var handledTaskNum int + // handle repeating tasks + for _, taskType := range repeatingTaskTypes { + handledTaskNum += p.handleRepeatingTasks(ctx, taskType) + + } + for _, tsk := range p.queryTasks { + handledTaskNum += p.handleTasks(ctx, tsk.Type, tsk.Status, tsk.TaskHandler) + } + if handledTaskNum == 0 { + ctx.Finished() + } + return nil +} + +func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task.Task) error) int { tasks, err := p.getMigrationTasks(taskType, taskStatus, p.config.TaskBatchSize) if err != nil { log.Errorf("failed to get [%s] with status %s, err: %v", taskType, taskStatus, err) - return + return 0 } if len(tasks) == 0 { - return + return 0 } log.Debugf("handling [%s] with status [%s], count: %d", taskType, taskStatus, len(tasks)) // refresh index after each batch @@ -151,7 +169,7 @@ func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string }() for i := range tasks { if ctx.IsCanceled() { - return + return 0 } taskItem := &tasks[i] err := p.handleTask(taskItem, taskHandler) @@ -167,7 +185,7 @@ func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string }, fmt.Sprintf("failed to handle task [%s]", taskItem.ID)) } } - return + return len(tasks) } func (p *DispatcherProcessor) handleTask(taskItem *task.Task, taskHandler func(taskItem *task.Task) error) error { diff --git a/plugin/task_manager/repeat.go b/plugin/task_manager/repeat.go index aa8c23aa..66931cb3 100644 --- a/plugin/task_manager/repeat.go +++ b/plugin/task_manager/repeat.go @@ -14,14 +14,14 @@ import ( "infini.sh/framework/core/util" ) -func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskType string) { +func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskType string) int { tasks, err := p.getPendingExecutionTasks(taskType, p.config.TaskBatchSize) if err != nil { log.Errorf("failed to get pending [%s] tasks, err: %v", taskType, err) - return + return 0 } if len(tasks) == 0 { - return + return 0 } log.Debugf("handling pending [%s] tasks, count: %d", taskType, len(tasks)) // refresh index after each batch @@ -30,7 +30,7 @@ func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskTy }() for i := range tasks { if ctx.IsCanceled() { - return + return 0 } taskItem := &tasks[i] err := p.handleTask(taskItem, p.handleRepeatingTask) @@ -46,7 +46,7 @@ func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskTy }, fmt.Sprintf("failed to handle task [%s]", taskItem.ID)) } } - return + return len(tasks) } func (p *DispatcherProcessor) getPendingExecutionTasks(taskType string, size int) ([]task.Task, error) {