From 02adb4d1211e7ba694863bdbc49c6b4f3e1284f9 Mon Sep 17 00:00:00 2001 From: medcl Date: Fri, 27 Jan 2023 12:09:09 +0800 Subject: [PATCH] refactoring pipelines --- config/initialization.tpl | 197 ++++-- main.go | 7 +- model/{gateway => }/instance.go | 2 +- plugin/api/gateway/instance.go | 31 +- plugin/elastic/activity.go | 332 +++++++++ plugin/elastic/metadata.go | 463 +++++++++++++ plugin/migration/api.go | 1116 +++++++++++++++++++++++++++++++ plugin/migration/model.go | 77 +++ plugin/migration/module.go | 31 + plugin/migration/pipeline.go | 507 ++++++++++++++ 10 files changed, 2686 insertions(+), 77 deletions(-) rename model/{gateway => }/instance.go (98%) create mode 100644 plugin/elastic/activity.go create mode 100644 plugin/elastic/metadata.go create mode 100644 plugin/migration/api.go create mode 100644 plugin/migration/model.go create mode 100644 plugin/migration/module.go create mode 100644 plugin/migration/pipeline.go diff --git a/config/initialization.tpl b/config/initialization.tpl index 74272417..ca09dd65 100644 --- a/config/initialization.tpl +++ b/config/initialization.tpl @@ -127,58 +127,6 @@ PUT $[[INDEX_PREFIX]]metrics-00001 } ], "properties": { - "metadata": { - "properties": { - "category": { - "type": "keyword", - "ignore_above": 256 - }, - "datatype": { - "type": "keyword", - "ignore_above": 256 - }, - "labels": { - "properties": { - "cluster_id": { - "type": "keyword", - "ignore_above": 256 - }, - "index_id": { - "type": "keyword", - "ignore_above": 256 - }, - "index_name": { - "type": "keyword", - "ignore_above": 256 - }, - "index_uuid": { - "type": "keyword", - "ignore_above": 256 - }, - "ip": { - "type": "keyword", - "ignore_above": 256 - }, - "node_id": { - "type": "keyword", - "ignore_above": 256 - }, - "node_name": { - "type": "keyword", - "ignore_above": 256 - }, - "transport_address": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "name": { - "type": "keyword", - "ignore_above": 256 - } - } - }, "timestamp": { "type": "date" } @@ -243,6 +191,145 @@ PUT $[[INDEX_PREFIX]]logs-00001 } } + +PUT _template/$[[INDEX_PREFIX]]requests_logging-rollover +{ + "order": 100000, + "index_patterns": [ + "$[[INDEX_PREFIX]]requests_logging*" + ], + "settings": { + "index": { + "format": "7", + "lifecycle": { + "name" : "ilm_$[[INDEX_PREFIX]]metrics-30days-retention", + "rollover_alias" : "$[[INDEX_PREFIX]]requests_logging" + }, + "codec": "best_compression", + "number_of_shards": "1", + "translog": { + "durability": "async" + } + } + }, + "mappings": { + "dynamic_templates": [ + { + "strings": { + "mapping": { + "ignore_above": 256, + "type": "keyword" + }, + "match_mapping_type": "string" + } + } + ], + "properties": { + "request": { + "properties": { + "body": { + "type": "text" + } + } + }, + "response": { + "properties": { + "body": { + "type": "text" + } + } + }, + "timestamp": { + "type": "date" + } + } + }, + "aliases": {} +} + +PUT $[[INDEX_PREFIX]]requests_logging-00001 +{ + "settings": { + "index.lifecycle.rollover_alias":"$[[INDEX_PREFIX]]requests_logging" + , "refresh_interval": "5s" + }, + "aliases":{ + "$[[INDEX_PREFIX]]requests_logging":{ + "is_write_index":true + } + } +} + + +PUT _template/$[[INDEX_PREFIX]]async_bulk_results-rollover +{ + "order": 100000, + "index_patterns": [ + "$[[INDEX_PREFIX]]async_bulk_results*" + ], + "settings": { + "index": { + "format": "7", + "lifecycle": { + "name" : "ilm_$[[INDEX_PREFIX]]metrics-30days-retention", + "rollover_alias" : "$[[INDEX_PREFIX]]async_bulk_results" + }, + "codec": "best_compression", + "number_of_shards": "1", + "translog": { + "durability": "async" + } + } + }, + "mappings": { + "dynamic_templates": [ + { + "strings": { + "mapping": { + "ignore_above": 256, + "type": "keyword" + }, + "match_mapping_type": "string" + } + } + ], + "properties": { + "request": { + "properties": { + "body": { + "type": "text" + } + } + }, + "response": { + "properties": { + "body": { + "type": "text" + } + } + }, + "timestamp": { + "type": "date" + } + } + }, + "aliases": {} +} + +PUT $[[INDEX_PREFIX]]async_bulk_results-00001 +{ + "settings": { + "index.lifecycle.rollover_alias":"$[[INDEX_PREFIX]]async_bulk_results" + , "refresh_interval": "5s" + }, + "aliases":{ + "$[[INDEX_PREFIX]]async_bulk_results":{ + "is_write_index":true + } + } +} + + PUT _template/$[[INDEX_PREFIX]]alert-history-rollover { "order" : 100000, @@ -450,7 +537,8 @@ PUT $[[INDEX_PREFIX]]activities-00001 ], "properties": { "changelog": { - "type": "flattened" + "type": "object", + "enabled": false }, "id": { "type": "keyword" @@ -465,9 +553,6 @@ PUT $[[INDEX_PREFIX]]activities-00001 "type": "keyword", "ignore_above": 256 }, - "labels": { - "type": "flattened" - }, "name": { "type": "keyword", "ignore_above": 256 diff --git a/main.go b/main.go index d5b0fc09..50ca3b66 100644 --- a/main.go +++ b/main.go @@ -6,8 +6,8 @@ import ( _ "expvar" log "github.com/cihub/seelog" "infini.sh/console/config" + "infini.sh/console/model" "infini.sh/console/model/alerting" - "infini.sh/console/model/gateway" _ "infini.sh/console/plugin" alerting2 "infini.sh/console/service/alerting" "infini.sh/framework" @@ -23,7 +23,6 @@ import ( _ "infini.sh/framework/modules/api" elastic2 "infini.sh/framework/modules/elastic" "infini.sh/framework/modules/metrics" - "infini.sh/framework/modules/migration" "infini.sh/framework/modules/pipeline" queue2 "infini.sh/framework/modules/queue/disk_queue" "infini.sh/framework/modules/redis" @@ -71,7 +70,6 @@ func main() { modules=append(modules,&agent.AgentModule{}) modules=append(modules,&metrics.MetricsModule{}) modules=append(modules,&security.Module{}) - modules=append(modules,&migration.MigrationModule{}) uiModule:=&ui.UIModule{} @@ -91,7 +89,6 @@ func main() { module.RegisterSystemModule(&agent.AgentModule{}) module.RegisterSystemModule(&metrics.MetricsModule{}) module.RegisterSystemModule(&security.Module{}) - module.RegisterSystemModule(&migration.MigrationModule{}) }else{ for _, v := range modules { v.Setup() @@ -133,7 +130,7 @@ func main() { orm.RegisterSchemaWithIndexName(elastic.View{}, "view") orm.RegisterSchemaWithIndexName(elastic.CommonCommand{}, "commands") //orm.RegisterSchemaWithIndexName(elastic.TraceTemplate{}, "trace-template") - orm.RegisterSchemaWithIndexName(gateway.Instance{}, "gateway-instance") + orm.RegisterSchemaWithIndexName(model.Instance{}, "instance") orm.RegisterSchemaWithIndexName(alerting.Rule{}, "alert-rule") orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history") orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message") diff --git a/model/gateway/instance.go b/model/instance.go similarity index 98% rename from model/gateway/instance.go rename to model/instance.go index 59c32005..31da501f 100644 --- a/model/gateway/instance.go +++ b/model/instance.go @@ -2,7 +2,7 @@ * web: https://infinilabs.com * mail: hello#infini.ltd */ -package gateway +package model import ( "infini.sh/framework/core/agent" diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index 3e0791f6..2f9f836e 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -9,7 +9,7 @@ import ( "fmt" log "github.com/cihub/seelog" "github.com/segmentio/encoding/json" - "infini.sh/console/model/gateway" + "infini.sh/console/model" "infini.sh/framework/core/agent" httprouter "infini.sh/framework/core/api/router" elastic2 "infini.sh/framework/core/elastic" @@ -25,7 +25,7 @@ import ( ) func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var obj = &gateway.Instance{} + var obj = &model.Instance{} err := h.DecodeJSON(req, obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -68,7 +68,7 @@ func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps func (h *GatewayAPI) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("instance_id") - obj := gateway.Instance{} + obj := model.Instance{} obj.ID = id exists, err := orm.Get(&obj) @@ -94,7 +94,7 @@ func (h *GatewayAPI) getInstance(w http.ResponseWriter, req *http.Request, ps ht func (h *GatewayAPI) updateInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("instance_id") - obj := gateway.Instance{} + obj := model.Instance{} obj.ID = id exists, err := orm.Get(&obj) @@ -108,7 +108,7 @@ func (h *GatewayAPI) updateInstance(w http.ResponseWriter, req *http.Request, ps id = obj.ID create := obj.Created - obj = gateway.Instance{} + obj = model.Instance{} err = h.DecodeJSON(req, &obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -135,7 +135,7 @@ func (h *GatewayAPI) updateInstance(w http.ResponseWriter, req *http.Request, ps func (h *GatewayAPI) deleteInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("instance_id") - obj := gateway.Instance{} + obj := model.Instance{} obj.ID = id exists, err := orm.Get(&obj) @@ -185,7 +185,7 @@ func (h *GatewayAPI) searchInstance(w http.ResponseWriter, req *http.Request, ps queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from) q.RawQuery = []byte(queryDSL) - err, res := orm.Search(&gateway.Instance{}, &q) + err, res := orm.Search(&model.Instance{}, &q) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -216,7 +216,7 @@ func (h *GatewayAPI) getInstanceStatus(w http.ResponseWriter, req *http.Request, } q.RawQuery = util.MustToJSONBytes(queryDSL) - err, res := orm.Search(&gateway.Instance{}, &q) + err, res := orm.Search(&model.Instance{}, &q) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -272,7 +272,7 @@ func (h *GatewayAPI) proxy(w http.ResponseWriter, req *http.Request, ps httprout ) instanceID := ps.MustGetParameter("instance_id") - obj := gateway.Instance{} + obj := model.Instance{} obj.ID = instanceID exists, err := orm.Get(&obj) @@ -369,7 +369,7 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, from = 0 } agentIndexName := orm.GetIndexName(agent.Instance{}) - gatewayIndexName := orm.GetIndexName(gateway.Instance{}) + gatewayIndexName := orm.GetIndexName(model.Instance{}) agentMust := []util.MapStr{ { "term": util.MapStr{ @@ -443,18 +443,19 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, } err, result := orm.Search(nil, &q) if err != nil { - log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + searchRes := elastic2.SearchResponse{} err = util.FromJSONBytes(result.Raw, &searchRes) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) + if err != nil||searchRes.ESError!=nil { + msg:=fmt.Sprintf("%v,%v",err,searchRes.ESError) + h.WriteError(w, msg, http.StatusInternalServerError) return } var nodes = []util.MapStr{} + for _, hit := range searchRes.Hits.Hits { var ( endpoint string @@ -484,7 +485,7 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, } if !hasErr { - available, err := isNodeAvailable(endpoint) + available, err := isNodeAvailable(endpoint) //TODO remove if err != nil { log.Error(err) } diff --git a/plugin/elastic/activity.go b/plugin/elastic/activity.go new file mode 100644 index 00000000..728613b5 --- /dev/null +++ b/plugin/elastic/activity.go @@ -0,0 +1,332 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package elastic + +import ( + "fmt" + "github.com/buger/jsonparser" + log "github.com/cihub/seelog" + "github.com/segmentio/encoding/json" + "infini.sh/framework/core/config" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" + "infini.sh/framework/core/global" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/pipeline" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/rotate" + "infini.sh/framework/core/util" + "runtime" + + "sync" + "time" +) + +type ActivityProcessor struct { + config *Config + runningConfigs map[string]*queue.QueueConfig + bulkSizeInByte int + wg sync.WaitGroup + inFlightQueueConfigs sync.Map + detectorRunning bool + id string +} + +func init() { + pipeline.RegisterProcessorPlugin("activity", NewActivityProcessor) +} + +func NewActivityProcessor(c *config.Config) (pipeline.Processor, error) { + cfg := Config{ + NumOfWorkers: 1, + MaxWorkers: 10, + MaxConnectionPerHost: 1, + IdleTimeoutInSecond: 5, + BulkSizeInMb: 10, + DetectIntervalInMs: 10000, + Queues: map[string]interface{}{}, + + Consumer: queue.ConsumerConfig{ + Group: "activity-001", + Name: "activity-001", + FetchMinBytes: 1, + FetchMaxBytes: 10 * 1024 * 1024, + FetchMaxMessages: 500, + EOFRetryDelayInMs: 1000, + FetchMaxWaitMs: 10000, + }, + + DetectActiveQueue: true, + ValidateRequest: false, + SkipEmptyQueue: true, + SkipOnMissingInfo: false, + RotateConfig: rotate.DefaultConfig, + BulkConfig: elastic.DefaultBulkProcessorConfig, + } + + if err := c.Unpack(&cfg); err != nil { + log.Error(err) + return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err) + } + + runner := ActivityProcessor{ + id: util.GetUUID(), + config: &cfg, + runningConfigs: map[string]*queue.QueueConfig{}, + inFlightQueueConfigs: sync.Map{}, + } + + runner.bulkSizeInByte = 1048576 * runner.config.BulkSizeInMb + if runner.config.BulkSizeInKb > 0 { + runner.bulkSizeInByte = 1024 * runner.config.BulkSizeInKb + } + + runner.wg = sync.WaitGroup{} + + return &runner, nil +} + +func (processor *ActivityProcessor) Name() string { + return "activity" +} + +func (processor *ActivityProcessor) Process(c *pipeline.Context) error { + defer func() { + if !global.Env().IsDebug { + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + log.Error("error in activity processor,", v) + } + } + log.Trace("exit activity processor") + }() + + //handle updates + if processor.config.DetectActiveQueue { + log.Tracef("detector running [%v]", processor.detectorRunning) + if !processor.detectorRunning { + processor.detectorRunning = true + processor.wg.Add(1) + go func(c *pipeline.Context) { + log.Tracef("init detector for active queue [%v] ", processor.id) + defer func() { + if !global.Env().IsDebug { + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + log.Error("error in activity processor,", v) + } + } + processor.detectorRunning = false + log.Debug("exit detector for active queue") + processor.wg.Done() + }() + + for { + if c.IsCanceled() { + return + } + + if global.Env().IsDebug { + log.Tracef("inflight queues: %v", util.MapLength(&processor.inFlightQueueConfigs)) + processor.inFlightQueueConfigs.Range(func(key, value interface{}) bool { + log.Tracef("inflight queue:%v", key) + return true + }) + } + + cfgs := queue.GetConfigByLabels(processor.config.Queues) + for _, v := range cfgs { + if c.IsCanceled() { + return + } + //if have depth and not in in flight + if queue.HasLag(v) { + _, ok := processor.inFlightQueueConfigs.Load(v.Id) + if !ok { + log.Tracef("detecting new queue: %v", v.Name) + processor.HandleQueueConfig(v, c) + } + } + } + if processor.config.DetectIntervalInMs > 0 { + time.Sleep(time.Millisecond * time.Duration(processor.config.DetectIntervalInMs)) + } + } + }(c) + } + } else { + cfgs := queue.GetConfigByLabels(processor.config.Queues) + log.Debugf("filter queue by:%v, num of queues:%v", processor.config.Queues, len(cfgs)) + for _, v := range cfgs { + log.Tracef("checking queue: %v", v) + processor.HandleQueueConfig(v, c) + } + } + + processor.wg.Wait() + + return nil +} + +func (processor *ActivityProcessor) HandleQueueConfig(v *queue.QueueConfig, c *pipeline.Context) { + + if processor.config.SkipEmptyQueue { + if !queue.HasLag(v) { + if global.Env().IsDebug { + log.Tracef("skip empty queue:[%v]", v.Name) + } + return + } + } + + elasticsearch := processor.config.Elasticsearch + if elasticsearch == "" { + log.Error("elasticsearch config was not found in activity processor") + return + } + + meta := elastic.GetMetadata(util.ToString(elasticsearch)) + if meta == nil { + log.Debugf("metadata for [%v] is nil", elasticsearch) + return + } + + host := meta.GetActiveHost() + log.Debugf("random choose node [%v] to consume queue [%v]", host, v.Id) + processor.wg.Add(1) + + //go processor.NewBulkWorker("bulk_indexing_"+host,c, processor.bulkSizeInByte, v, host) + go processor.HandleMessage(c, v) + +} + +func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig *queue.QueueConfig) { + defer func() { + if !global.Env().IsDebug { + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + log.Errorf("error in %s processor: %v", processor.Name(), v) + } + } + processor.wg.Done() + log.Tracef("exit %s processor", processor.Name()) + }() + + key := qConfig.Id + + if processor.config.MaxWorkers > 0 && util.MapLength(&processor.inFlightQueueConfigs) > processor.config.MaxWorkers { + log.Debugf("reached max num of workers, skip init [%v]", qConfig.Name) + return + } + + var workerID = util.GetUUID() + _, exists := processor.inFlightQueueConfigs.Load(key) + if exists { + log.Errorf("queue [%v] has more then one consumer", qConfig.Id) + return + } + + processor.inFlightQueueConfigs.Store(key, workerID) + log.Debugf("starting worker:[%v], queue:[%v]", workerID, qConfig.Name) + var consumer = queue.GetOrInitConsumerConfig(qConfig.Id, processor.config.Consumer.Group, processor.config.Consumer.Name) + initOffset, _ := queue.GetOffset(qConfig, consumer) + offset := initOffset + defer func() { + log.Debugf("worker:[%v] start consume queue:[%v] offset:%v", workerID, qConfig.Id, offset) + }() + + for { + if ctx.IsCanceled() { + return + } + + ctx1, messages, timeout, err := queue.Consume(qConfig, consumer, offset) + + if timeout { + log.Tracef("timeout on queue:[%v]", qConfig.Name) + ctx.Failed() + return + } + + if err != nil { + log.Tracef("error on queue:[%v]", qConfig.Name) + if err.Error() == "EOF" { + if len(messages) > 0 { + goto HANDLE_MESSAGE + } + return + } + panic(err) + } + + HANDLE_MESSAGE: + + //update temp offset, not committed, continued reading + offset = ctx1.NextOffset.String()//TODO + + if len(messages) > 0 { + for _, pop := range messages { + typ, err := jsonparser.GetString(pop.Data, "metadata", "name") + if err != nil { + panic(err) + } + switch typ { + case "activity": + activity, _, _, err := jsonparser.Get(pop.Data, "payload", "activity") + if err != nil { + panic(err) + } + + err = processor.HandleActivity(activity) + } + + } + } + if err == nil { + if offset != "" && initOffset != offset { + ok, err := queue.CommitOffset(qConfig, consumer, offset) + if !ok || err != nil { + panic(err) + } + initOffset=offset + } + } else { + log.Error(err) + } + } +} + +func (processor *ActivityProcessor) HandleActivity(activityByte []byte) error { + // save activity + activityInfo := &event.Activity{} + json.Unmarshal(activityByte, activityInfo) + esClient := elastic.GetClient(processor.config.Elasticsearch) + _, err := esClient.Index(orm.GetIndexName(activityInfo), "", activityInfo.ID, activityInfo, "") + return err +} diff --git a/plugin/elastic/metadata.go b/plugin/elastic/metadata.go new file mode 100644 index 00000000..910020ff --- /dev/null +++ b/plugin/elastic/metadata.go @@ -0,0 +1,463 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package elastic + +import ( + "fmt" + "github.com/buger/jsonparser" + log "github.com/cihub/seelog" + "infini.sh/framework/core/config" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" + "infini.sh/framework/core/global" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/pipeline" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/rotate" + "infini.sh/framework/core/util" + "runtime" + + "sync" + "time" +) + +type MetadataProcessor struct { + config *Config + runningConfigs map[string]*queue.QueueConfig + bulkSizeInByte int + wg sync.WaitGroup + inFlightQueueConfigs sync.Map + detectorRunning bool + id string +} + +type Config struct { + NumOfWorkers int `config:"worker_size"` + + IdleTimeoutInSecond int `config:"idle_timeout_in_seconds"` + MaxConnectionPerHost int `config:"max_connection_per_node"` + + BulkSizeInKb int `config:"bulk_size_in_kb,omitempty"` + BulkSizeInMb int `config:"bulk_size_in_mb,omitempty"` + BulkMaxDocsCount int `config:"bulk_max_docs_count,omitempty"` + + Queues map[string]interface{} `config:"queues,omitempty"` + + Consumer queue.ConsumerConfig `config:"consumer"` + + MaxWorkers int `config:"max_worker_size"` + + DetectActiveQueue bool `config:"detect_active_queue"` + DetectIntervalInMs int `config:"detect_interval"` + + ValidateRequest bool `config:"valid_request"` + SkipEmptyQueue bool `config:"skip_empty_queue"` + SkipOnMissingInfo bool `config:"skip_info_missing"` + + RotateConfig rotate.RotateConfig `config:"rotate"` + BulkConfig elastic.BulkProcessorConfig `config:"bulk"` + + Elasticsearch string `config:"elasticsearch,omitempty"` + + WaitingAfter []string `config:"waiting_after"` +} + +func init() { + pipeline.RegisterProcessorPlugin("metadata", New) +} + +func New(c *config.Config) (pipeline.Processor, error) { + cfg := Config{ + NumOfWorkers: 1, + MaxWorkers: 10, + MaxConnectionPerHost: 1, + IdleTimeoutInSecond: 5, + BulkSizeInMb: 10, + DetectIntervalInMs: 10000, + Queues: map[string]interface{}{}, + + Consumer: queue.ConsumerConfig{ + Group: "metadata-001", + Name: "metadata-001", + FetchMinBytes: 1, + FetchMaxBytes: 10 * 1024 * 1024, + FetchMaxMessages: 500, + EOFRetryDelayInMs: 1000, + FetchMaxWaitMs: 10000, + }, + + DetectActiveQueue: true, + ValidateRequest: false, + SkipEmptyQueue: true, + SkipOnMissingInfo: false, + RotateConfig: rotate.DefaultConfig, + BulkConfig: elastic.DefaultBulkProcessorConfig, + } + + if err := c.Unpack(&cfg); err != nil { + log.Error(err) + return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err) + } + + runner := MetadataProcessor{ + id: util.GetUUID(), + config: &cfg, + runningConfigs: map[string]*queue.QueueConfig{}, + inFlightQueueConfigs: sync.Map{}, + } + + runner.bulkSizeInByte = 1048576 * runner.config.BulkSizeInMb + if runner.config.BulkSizeInKb > 0 { + runner.bulkSizeInByte = 1024 * runner.config.BulkSizeInKb + } + + runner.wg = sync.WaitGroup{} + + return &runner, nil +} + +func (processor *MetadataProcessor) Name() string { + return "metadata" +} + +func (processor *MetadataProcessor) Process(c *pipeline.Context) error { + defer func() { + if !global.Env().IsDebug { + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + log.Error("error in metadata processor,", v) + } + } + log.Trace("exit metadata processor") + }() + + //handle updates + if processor.config.DetectActiveQueue { + log.Tracef("detector running [%v]", processor.detectorRunning) + if !processor.detectorRunning { + processor.detectorRunning = true + processor.wg.Add(1) + go func(c *pipeline.Context) { + log.Tracef("init detector for active queue [%v] ", processor.id) + defer func() { + if !global.Env().IsDebug { + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + log.Error("error in metadata processor,", v) + } + } + processor.detectorRunning = false + log.Debug("exit detector for active queue") + processor.wg.Done() + }() + + for { + if c.IsCanceled() { + return + } + + if global.Env().IsDebug { + log.Tracef("inflight queues: %v", util.MapLength(&processor.inFlightQueueConfigs)) + processor.inFlightQueueConfigs.Range(func(key, value interface{}) bool { + log.Tracef("inflight queue:%v", key) + return true + }) + } + + cfgs := queue.GetConfigByLabels(processor.config.Queues) + for _, v := range cfgs { + if c.IsCanceled() { + return + } + //if have depth and not in in flight + if queue.HasLag(v) { + _, ok := processor.inFlightQueueConfigs.Load(v.Id) + if !ok { + log.Tracef("detecting new queue: %v", v.Name) + processor.HandleQueueConfig(v, c) + } + } + } + if processor.config.DetectIntervalInMs > 0 { + time.Sleep(time.Millisecond * time.Duration(processor.config.DetectIntervalInMs)) + } + } + }(c) + } + } else { + cfgs := queue.GetConfigByLabels(processor.config.Queues) + log.Debugf("filter queue by:%v, num of queues:%v", processor.config.Queues, len(cfgs)) + for _, v := range cfgs { + log.Tracef("checking queue: %v", v) + processor.HandleQueueConfig(v, c) + } + } + + processor.wg.Wait() + + return nil +} + +func (processor *MetadataProcessor) HandleQueueConfig(v *queue.QueueConfig, c *pipeline.Context) { + + if processor.config.SkipEmptyQueue { + if !queue.HasLag(v) { + if global.Env().IsDebug { + log.Tracef("skip empty queue:[%v]", v.Name) + } + return + } + } + + elasticsearch := processor.config.Elasticsearch + if elasticsearch == "" { + log.Error("elasticsearch config was not found in metadata processor") + return + } + + meta := elastic.GetMetadata(util.ToString(elasticsearch)) + if meta == nil { + log.Debugf("metadata for [%v] is nil", elasticsearch) + return + } + + host := meta.GetActiveHost() + log.Debugf("random choose node [%v] to consume queue [%v]", host, v.Id) + processor.wg.Add(1) + + //go processor.NewBulkWorker("bulk_indexing_"+host,c, processor.bulkSizeInByte, v, host) + go processor.HandleMessage(c, v) + +} + +func (processor *MetadataProcessor) HandleMessage(ctx *pipeline.Context, qConfig *queue.QueueConfig) { + defer func() { + if !global.Env().IsDebug { + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + log.Errorf("error in %s processor: %v", processor.Name(), v) + } + } + processor.wg.Done() + log.Tracef("exit %s processor", processor.Name()) + }() + + key := qConfig.Id + + if processor.config.MaxWorkers > 0 && util.MapLength(&processor.inFlightQueueConfigs) > processor.config.MaxWorkers { + log.Debugf("reached max num of workers, skip init [%v]", qConfig.Name) + return + } + + var workerID = util.GetUUID() + _, exists := processor.inFlightQueueConfigs.Load(key) + if exists { + log.Errorf("queue [%v] has more then one consumer", qConfig.Id) + return + } + + processor.inFlightQueueConfigs.Store(key, workerID) + log.Debugf("starting worker:[%v], queue:[%v]", workerID, qConfig.Name) + var consumer = queue.GetOrInitConsumerConfig(qConfig.Id, processor.config.Consumer.Group, processor.config.Consumer.Name) + initOffset, _ := queue.GetOffset(qConfig, consumer) + offset := initOffset + defer func() { + log.Debugf("worker:[%v] start consume queue:[%v] offset:%v", workerID, qConfig.Id, offset) + }() + + for { + if ctx.IsCanceled() { + return + } + ctx1, messages, isTimeout, err := queue.Consume(qConfig, consumer, offset) + //if timeout{ + // log.Tracef("timeout on queue:[%v]",qConfig.Name) + // ctx.Failed() + // return + //} + + if err != nil { + log.Tracef("error on queue:[%v]", qConfig.Name) + if err.Error() == "EOF" { + if len(messages) > 0 { + goto HANDLE_MESSAGE + } + return + } + //panic(err) + if isTimeout { + time.Sleep(time.Millisecond * 1000) + } + } + + HANDLE_MESSAGE: + + //update temp offset, not committed, continued reading + offset = ctx1.NextOffset.String()//TODO + + if len(messages) > 0 { + for _, pop := range messages { + + typ, err := jsonparser.GetString(pop.Data, "metadata", "name") + if err != nil { + panic(err) + } + switch typ { + case "index_health_change": + //err = processor.HandleIndexHealthChange(&ev) + case "index_state_change": + indexState, _, _, err := jsonparser.Get(pop.Data, "payload", "index_state") + if err != nil { + panic(err) + } + err = processor.HandleIndexStateChange(indexState) + case "unknown_node_status": + processor.HandleUnknownNodeStatus(pop.Data) + } + + } + } + if err == nil { + if offset != "" && initOffset != offset { + ok, err := queue.CommitOffset(qConfig, consumer, offset) + if !ok || err != nil { + panic(err) + } + initOffset=offset + } + } else { + if !isTimeout { + log.Error(err) + } + } + } +} +func (processor *MetadataProcessor) HandleIndexStateChange(indexState []byte) error { + esClient := elastic.GetClient(processor.config.Elasticsearch) + // save index metadata + id, err := jsonparser.GetString(indexState, "id") + if err != nil { + return err + } + storeIndexName := orm.GetIndexName(elastic.IndexConfig{}) + + _, err = esClient.Index(storeIndexName, "", id, indexState, "") + return err +} + +func (processor *MetadataProcessor) HandleUnknownNodeStatus(ev []byte) error { + clusterID, err := jsonparser.GetString(ev, "payload", "cluster_id") + if err != nil { + return err + } + esClient := elastic.GetClient(processor.config.Elasticsearch) + queryDslTpl := `{"script": { + "source": "ctx._source.metadata.labels.status='unavailable'", + "lang": "painless" + }, + "query": { + "bool": { + "must": [ + {"term": { + "metadata.cluster_id": { + "value": "%s" + } + }}, + {"term": { + "metadata.category": { + "value": "elasticsearch" + } + }} + ] + } + }}` + queryDsl := fmt.Sprintf(queryDslTpl, clusterID) + _, err = esClient.UpdateByQuery(orm.GetIndexName(elastic.NodeConfig{}), []byte(queryDsl)) + return err +} + +func (processor *MetadataProcessor) HandleIndexHealthChange(ev *event.Event) error { + // save activity + activityInfo := &event.Activity{ + ID: util.GetUUID(), + Timestamp: ev.Timestamp, + Metadata: event.ActivityMetadata{ + Category: ev.Metadata.Category, + Group: "metadata", + Name: "index_health_change", + Type: "update", + Labels: ev.Metadata.Labels, + }, + } + esClient := elastic.GetClient(processor.config.Elasticsearch) + _, err := esClient.Index(orm.GetIndexName(activityInfo), "", activityInfo.ID, activityInfo, "") + if err != nil { + return err + } + // update index health status + queryDslTpl := `{ + "size": 1, + "query": { + "bool": { + "must": [ + {"term": { + "metadata.index_id": { + "value": "%s" + } + }}, + {"term": { + "metadata.category": { + "value": "elasticsearch" + } + }} + ], + "must_not": [ + {"term": { + "metadata.labels.index_status": { + "value": "deleted" + } + }} + ] + } + } +}` + queryDsl := fmt.Sprintf(queryDslTpl, ev.Metadata.Labels["index_id"]) + indexName := orm.GetIndexName(elastic.IndexConfig{}) + searchRes, err := esClient.SearchWithRawQueryDSL(indexName, []byte(queryDsl)) + if err != nil { + return err + } + if searchRes.GetTotal() == 0 { + return nil + } + source := util.MapStr(searchRes.Hits.Hits[0].Source) + source.Put("metadata.labels.health_status", ev.Metadata.Labels["to"]) + _, err = esClient.Index(indexName, "", searchRes.Hits.Hits[0].ID, source, "") + return err +} diff --git a/plugin/migration/api.go b/plugin/migration/api.go new file mode 100644 index 00000000..40af3b91 --- /dev/null +++ b/plugin/migration/api.go @@ -0,0 +1,1116 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package migration + +import ( + "context" + "fmt" + log "github.com/cihub/seelog" + "infini.sh/framework/core/agent" + "infini.sh/framework/core/api" + "infini.sh/framework/core/api/rbac" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/proxy" + task2 "infini.sh/framework/core/task" + "infini.sh/framework/core/util" + "net/http" + "strconv" + "strings" + "time" +) + + +func InitAPI() { + handler := APIHandler{} + api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequireLogin(handler.searchDataMigrationTask)) + api.HandleAPIMethod(api.POST, "/migration/data", handler.RequireLogin(handler.createDataMigrationTask)) + api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration)) + + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo) + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequireLogin(handler.startDataMigration)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequireLogin(handler.stopDataMigrationTask)) + //api.HandleAPIMethod(api.GET, "/migration/data/:task_id", handler.getMigrationTask) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequireLogin(handler.getDataMigrationTaskInfo)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/index", handler.RequireLogin(handler.getDataMigrationTaskOfIndex)) + api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequireLogin(handler.updateDataMigrationTaskStatus)) + +} + +type APIHandler struct { + api.Handler +} + +func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + clusterTaskConfig := &ElasticDataConfig{} + err := h.DecodeJSON(req, clusterTaskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(clusterTaskConfig.Indices) == 0 { + h.WriteError(w, "indices must not be empty", http.StatusInternalServerError) + return + } + clusterTaskConfig.Creator = struct { + Name string `json:"name"` + Id string `json:"id"` + }{} + claims, ok := req.Context().Value("user").(*rbac.UserClaims) + if ok { + clusterTaskConfig.Creator.Name = claims.Username + clusterTaskConfig.Creator.Id = claims.ID + } + + var totalDocs int64 + for _, index := range clusterTaskConfig.Indices { + totalDocs += index.Source.Docs + } + + t := task2.Task{ + Metadata: task2.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "pipeline_id": "cluster_migration", + "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, + "target_cluster_id": clusterTaskConfig.Cluster.Target.Id, + "source_total_docs": totalDocs, + }, + }, + Cancellable: true, + Runnable: false, + Status: task2.StatusInit, + Parameters: map[string]interface{}{ + "pipeline": util.MapStr{ + "id": "cluster_migration", + "config": clusterTaskConfig, + }, + }, + } + err = orm.Create(nil, &t) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": t.ID, + "result": "created", + }, 200) + +} + +func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ( + keyword = h.GetParameterOrDefault(req, "keyword", "") + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + mustQ []interface{} + ) + mustQ = append(mustQ, util.MapStr{ + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "cluster_migration", + }, + }, + }) + + if keyword != "" { + mustQ = append(mustQ, util.MapStr{ + "query_string": util.MapStr{ + "default_field": "*", + "query": keyword, + }, + }) + } + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + queryDSL := util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "size": size, + "from": from, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + }, + } + + q := orm.Query{} + q.RawQuery = util.MustToJSONBytes(queryDSL) + + err, res := orm.Search(&task2.Task{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + searchRes := &elastic.SearchResponse{} + err = util.FromJSONBytes(res.Raw, searchRes) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + mainLoop: + for _, hit := range searchRes.Hits.Hits { + sourceM := util.MapStr(hit.Source) + config, err := sourceM.GetValue("parameters.pipeline.config") + if err != nil { + log.Error(err) + continue + } + buf := util.MustToJSONBytes(config) + dataConfig := ElasticDataConfig{} + err = util.FromJSONBytes(buf, &dataConfig) + if err != nil { + log.Error(err) + continue + } + var targetTotalDocs int64 + targetTotal, _ := sourceM.GetValue("metadata.labels.target_total_docs") + if _, ok := targetTotal.(float64); !ok || hit.Source["status"] != task2.StatusComplete { + esClient := elastic.GetClientNoPanic(dataConfig.Cluster.Target.Id) + if esClient == nil { + log.Warnf("cluster [%s] was not found", dataConfig.Cluster.Target.Id) + continue + } + for _, index := range dataConfig.Indices { + count, err := getIndexTaskDocCount(&index, esClient) + if err != nil { + log.Error(err) + continue mainLoop + } + targetTotalDocs += count + } + sourceM.Put("metadata.labels.target_total_docs", targetTotalDocs) + sourceTotalDocs, _ := sourceM.GetValue("metadata.labels.source_total_docs") + if sv, ok := sourceTotalDocs.(float64); ok{ + if int64(sv) == targetTotalDocs { + hit.Source["status"] = task2.StatusComplete + } + } + } + + } + + h.WriteJSON(w, searchRes, http.StatusOK) +} + +func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + var ( + index = ps.MustGetParameter("index") + clusterID = ps.MustGetParameter("id") + ) + client := elastic.GetClient(clusterID) + pq := &elastic.PartitionQuery{ + IndexName: index, + } + err := h.DecodeJSON(req, pq) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + partitions, err := elastic.GetPartitions(pq, client) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, partitions, http.StatusOK) +} + +func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + taskID := ps.MustGetParameter("task_id") + obj := task2.Task{} + + obj.ID = taskID + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) + return + } + if obj.Status == "init" { + //root task + obj.Status = task2.StatusReady + }else if obj.Status == task2.StatusStopped { + if obj.Metadata.Labels["level"] == "partition" { + obj.Status = task2.StatusReady + //update parent task status + if len(obj.ParentId) == 0 { + h.WriteError(w, fmt.Sprintf("empty parent id of task [%s]", taskID), http.StatusInternalServerError) + return + } + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "id": obj.ParentId, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusRunning), + }, + } + + err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + }else{ + obj.Status = task2.StatusRunning + //update sub task status + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusError, task2.StatusStopped}, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusReady), + }, + } + + err = orm.UpdateBy(task2.Task{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + + }else if obj.Status == task2.StatusError { + obj.Status = task2.StatusReady + } + + err = orm.Update(nil, &obj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) +} + +func getNodeEndpoint(nodeID string) (string, error){ + indexName := ".infini_agent,.infini_gateway-instance" + query := util.MapStr{ + "size": 1, + "query": util.MapStr{ + "term": util.MapStr{ + "id": util.MapStr{ + "value": nodeID, + }, + }, + }, + } + q := orm.Query{ + IndexName: indexName, + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(nil, &q) + if err != nil { + return "", err + } + if len(result.Result) == 0 { + return "", fmt.Errorf("node [%s] not found", nodeID) + } + if info, ok := result.Result[0].(map[string]interface{}); ok { + if v, ok := info["endpoint"]; ok { + if endpoint, ok := v.(string); ok { + return endpoint, nil + } + return "", fmt.Errorf("got invalid endpoint value: %v", v) + } + ag := agent.Instance{} + buf := util.MustToJSONBytes(info) + err = util.FromJSONBytes(buf, &ag) + if err != nil { + return "", err + } + return ag.GetEndpoint(), nil + } + return "", fmt.Errorf("got unexpect node info: %s", util.MustToJSON(result.Result[0])) +} + +func stopTask(nodeID, taskID string) error { + endpoint, err := getNodeEndpoint(nodeID) + if err != nil { + return err + } + res, err := proxy.DoProxyRequest(&proxy.Request{ + Method: http.MethodPost, + Endpoint: endpoint, + Path: fmt.Sprintf("/pipeline/task/%s/_stop", taskID), + }) + + if err != nil { + return fmt.Errorf("call stop task api error: %w", err) + } + resBody := struct { + Acknowledged bool `json:"acknowledged"` + Error string `json:"error"` + }{} + err = util.FromJSONBytes(res.Body, &resBody) + if err != nil { + return err + } + if resBody.Acknowledged { + return nil + } + return fmt.Errorf(resBody.Error) +} + +func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + obj := task2.Task{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + execution, _ := util.MapStr(obj.Parameters).GetValue("pipeline.config.settings.execution") + if execution == nil { + execution, err = util.MapStr(obj.Parameters).GetValue("pipeline.config.execution") + if err != nil { + errStr := fmt.Sprintf("get execution config in task %s error: %s", id, err.Error()) + h.WriteError(w, errStr, http.StatusInternalServerError) + log.Error(errStr) + return + } + } + buf := util.MustToJSONBytes(execution) + executionConfig := ExecutionConfig{} + err = util.FromJSONBytes(buf, &executionConfig) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + if len(executionConfig.Nodes.Permit) == 0 { + h.WriteError(w, "node of running task can not found", http.StatusInternalServerError) + return + } + + err = stopTask(executionConfig.Nodes.Permit[0].ID, id) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": id, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusRunning, task2.StatusInit}, + }, + }, + }, + }, + } + //todo reset stat_time? + queryDsl := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "id": util.MapStr{ + "value": id, + }, + }, + }, + query, + }, + }, + }, + "script": util.MapStr{ + "source": "ctx._source['status'] = 'stopped'", + }, + } + + err = orm.UpdateBy(task2.Task{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) +} + +func getTaskConfig(task *task2.Task, config interface{}) error{ + configSec, err := util.MapStr(task.Parameters).GetValue("pipeline.config") + if err != nil { + return err + } + configBytes, err := util.ToJSONBytes(configSec) + if err != nil { + return err + } + + return util.FromJSONBytes(configBytes, config) +} + +func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + id := ps.MustGetParameter("task_id") + + obj := task2.Task{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + taskConfig := &ElasticDataConfig{} + err = getTaskConfig(&obj, taskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + taskErrors, err := getErrorPartitionTasks(id) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + //get status of sub task + query := util.MapStr{ + "size": 1000, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": id, + }, + }, + },{ + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.level": util.MapStr{ + "value": "index", + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(task2.Task{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + statusM := util.MapStr{} + for _, row := range result.Result { + if rowM, ok := row.(map[string]interface{}); ok { + if v, ok := rowM["id"].(string); ok { + statusM[v] = rowM["status"] + } + } + } + + + var completedIndices int + for i, index := range taskConfig.Indices { + if st, ok := statusM[index.TaskID]; ok { + taskConfig.Indices[i].Status = st.(string) + } + var count = index.Target.Docs + if taskConfig.Indices[i].Status != task2.StatusComplete || count == 0 { + targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id) + if targetESClient == nil { + log.Warnf("cluster [%s] was not found", taskConfig.Cluster.Target.Id) + continue + } + count, err = getIndexTaskDocCount(&index, targetESClient) + if err != nil { + log.Error(err) + continue + } + taskConfig.Indices[i].Target.Docs = count + } + percent := float64(count * 100) / float64(index.Source.Docs) + taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) + taskConfig.Indices[i].ErrorPartitions = taskErrors[index.TaskID] + if count == index.Source.Docs { + completedIndices ++ + taskConfig.Indices[i].Status = task2.StatusComplete + } + } + util.MapStr(obj.Parameters).Put("pipeline.config", taskConfig) + obj.Metadata.Labels["completed_indices"] = completedIndices + h.WriteJSON(w, obj, http.StatusOK) +} +func getErrorPartitionTasks(taskID string) (map[string]int, error){ + query := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "group_by_task": util.MapStr{ + "terms": util.MapStr{ + "field": "parent_id", + "size": 100, + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "term": util.MapStr{ + "runnable": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": task2.StatusError, + }, + }, + }, + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, + }, + }, + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(task2.Task{}, q) + if err != nil { + return nil, err + } + + searchRes := &elastic.SearchResponse{} + err = util.FromJSONBytes(result.Raw, searchRes) + if err != nil { + return nil, err + } + resBody := map[string]int{} + + if taskAgg, ok := searchRes.Aggregations["group_by_task"]; ok { + for _, bk := range taskAgg.Buckets { + if key, ok := bk["key"].(string); ok { + if key == taskID { + continue + } + resBody[key] = int(bk["doc_count"].(float64)) + } + } + } + return resBody, nil +} + +func getIndexTaskDocCount(index *IndexConfig, targetESClient elastic.API) (int64, error) { + targetIndexName := index.Target.Name + if targetIndexName == "" { + if v, ok := index.IndexRename[index.Source.Name].(string); ok { + targetIndexName = v + } + } + + var body []byte + var must []interface{} + if index.Target.DocType != "" && targetESClient.GetMajorVersion() < 8 { + must = append(must, util.MapStr{ + "terms": util.MapStr{ + "_type": []string{index.Target.DocType}, + }, + }) + } + if index.RawFilter != nil { + must = append(must, index.RawFilter) + } + if len(must) > 0 { + query := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": must, + }, + }, + } + body = util.MustToJSONBytes(query) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + countRes, err := targetESClient.Count(ctx, targetIndexName, body) + if err != nil { + return 0, err + } + if countRes.StatusCode != http.StatusOK && countRes.RawResult != nil { + return 0, fmt.Errorf(string(countRes.RawResult.Body)) + } + return countRes.Count, nil +} + +func getExecutionConfig(parameters map[string]interface{}, key string)(*ExecutionConfig, error){ + execution, err := util.MapStr(parameters).GetValue(key) + if err != nil { + return nil, err + } + buf := util.MustToJSONBytes(execution) + executionConfig := ExecutionConfig{} + err = util.FromJSONBytes(buf, &executionConfig) + return &executionConfig, err +} + +func getTaskStats(nodeID string) (map[string]interface{}, error){ + endpoint, err := getNodeEndpoint(nodeID) + if err != nil { + return nil, err + } + res, err := proxy.DoProxyRequest(&proxy.Request{ + Method: http.MethodGet, + Endpoint: endpoint, + Path: "/stats", + }) + + if err != nil { + return nil, fmt.Errorf("call stats api error: %w", err) + } + resBody := struct { + Stats map[string]interface{} `json:"stats"` + }{} + err = util.FromJSONBytes(res.Body, &resBody) + return resBody.Stats, err +} + +func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + id := ps.MustGetParameter("task_id") + indexTask := task2.Task{} + indexTask.ID=id + exists, err := orm.Get(&indexTask) + if !exists || err != nil { + h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError) + return + } + + var durationInMS int64 + if indexTask.StartTimeInMillis > 0 { + durationInMS = time.Now().UnixMilli() - indexTask.StartTimeInMillis + if indexTask.CompletedTime != nil && indexTask.Status == task2.StatusComplete { + durationInMS = indexTask.CompletedTime.UnixMilli() - indexTask.StartTimeInMillis + } + } + var completedTime int64 + if indexTask.CompletedTime != nil { + completedTime = indexTask.CompletedTime.UnixMilli() + } + + taskInfo := util.MapStr{ + "task_id": id, + "start_time": indexTask.StartTimeInMillis, + "status": indexTask.Status, + "completed_time": completedTime, + "duration": durationInMS, + } + if len(indexTask.Metadata.Labels) > 0 { + taskInfo["data_partition"] = indexTask.Metadata.Labels["partition_count"] + } + partitionTaskQuery := util.MapStr{ + "size": 500, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "asc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": id, + }, + }, + },{ + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(partitionTaskQuery), + } + err, result := orm.Search(task2.Task{}, q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + executionConfig, err := getExecutionConfig(indexTask.Parameters, "pipeline.config.execution") + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(executionConfig.Nodes.Permit) == 0 { + h.WriteError(w, "node of running task can not found", http.StatusInternalServerError) + return + } + stats, err := getTaskStats(executionConfig.Nodes.Permit[0].ID) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + var ( + partitionTaskInfos []util.MapStr + completedPartitions int + ) + for i, row := range result.Result { + buf := util.MustToJSONBytes(row) + ptask := task2.Task{} + err = util.FromJSONBytes(buf, &ptask) + if err != nil { + log.Error(err) + continue + } + start, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.start") + end, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.end") + if i == 0 { + step, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.step") + taskInfo["step"] = step + } + durationInMS = 0 + if ptask.StartTimeInMillis > 0 { + durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis + if ptask.CompletedTime != nil && (ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError) { + durationInMS = ptask.CompletedTime.UnixMilli() - ptask.StartTimeInMillis + } + } + var ( + scrollDocs float64 + indexDocs float64 + ) + if pt, ok := stats[ptask.ID]; ok { + if ptv, ok := pt.(map[string]interface{}); ok { + if v, ok := ptv["scroll_docs"].(float64); ok { + scrollDocs = v + } + if v, ok := ptv["bulk_docs.200"].(float64); ok { + indexDocs = v + } + if v, ok := ptv["bulk_docs.201"].(float64); ok { + indexDocs += v + } + } + } + var subCompletedTime int64 + if ptask.CompletedTime != nil { + subCompletedTime = ptask.CompletedTime.UnixMilli() + } + + partitionTotalDocs, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.doc_count") + partitionTaskInfos = append(partitionTaskInfos, util.MapStr{ + "task_id": ptask.ID, + "status": ptask.Status, + "start_time": ptask.StartTimeInMillis, + "completed_time": subCompletedTime, + "start": start, + "end": end, + "duration": durationInMS, + "scroll_docs": scrollDocs, + "index_docs": indexDocs, + "total_docs": partitionTotalDocs, + }) + if ptask.Status == task2.StatusComplete { + completedPartitions++ + } + } + taskInfo["partitions"] = partitionTaskInfos + taskInfo["completed_partitions"] = completedPartitions + h.WriteJSON(w, taskInfo, http.StatusOK) +} + +func (h *APIHandler) getMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + id := ps.MustGetParameter("task_id") + + obj := task2.Task{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + + h.WriteJSON(w, util.MapStr{ + "found": true, + "_id": id, + "_source": obj, + }, 200) +} + +func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + var ( + index = ps.MustGetParameter("index") + clusterID = ps.MustGetParameter("id") + ) + client := elastic.GetClient(clusterID) + reqBody := struct { + Filter interface{} `json:"filter"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var query []byte + if reqBody.Filter != nil { + query = util.MustToJSONBytes(util.MapStr{ + "query": reqBody.Filter, + }) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + countRes, err := client.Count(ctx, index, query) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, countRes, http.StatusOK) +} + +func (h *APIHandler) getMigrationTaskLog(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + id := ps.MustGetParameter("task_id") + query := util.MapStr{ + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "asc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": id, + }, + }, + },{ + "term": util.MapStr{ + "id": util.MapStr{ + "value": id, + }, + }, + }, + }, + }, + }, + } + + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, _ := orm.Search(task2.Log{}, q) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + } +} + +func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + id := ps.MustGetParameter("task_id") + + obj := task2.Task{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + reqBody := struct { + Status string `json:"status"` + }{} + err = h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + obj.Status = reqBody.Status + err = orm.Update(nil, obj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) +} + +func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + typ := h.GetParameter(req, "type") + switch typ { + case "multi_type": + h.validateMultiType(w, req, ps) + return + } + h.WriteError(w, "unknown parameter type", http.StatusOK) +} + +func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + var reqBody = struct { + Cluster struct{ + SourceID string `json:"source_id"` + TargetID string `json:"target_id"` + } `json:"cluster"` + Indices []string + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + sourceClient := elastic.GetClient(reqBody.Cluster.SourceID) + // get source type + indexNames := strings.Join(reqBody.Indices, ",") + typeInfo, err := elastic.GetIndexTypes(sourceClient, indexNames) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.WriteJSON(w, util.MapStr{ + "result": typeInfo, + } , http.StatusOK) +} \ No newline at end of file diff --git a/plugin/migration/model.go b/plugin/migration/model.go new file mode 100644 index 00000000..ffe5428d --- /dev/null +++ b/plugin/migration/model.go @@ -0,0 +1,77 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package migration + +type ElasticDataConfig struct { + Cluster struct { + Source ClusterInfo `json:"source"` + Target ClusterInfo `json:"target"` + } `json:"cluster"` + Indices []IndexConfig `json:"indices"` + Settings struct { + ParallelIndices int `json:"parallel_indices"` + ParallelTaskPerIndex int `json:"parallel_task_per_index"` + ScrollSize struct { + Docs int `json:"docs"` + Timeout string `json:"timeout"` + } `json:"scroll_size"` + BulkSize struct { + Docs int `json:"docs"` + StoreSizeInMB int `json:"store_size_in_mb"` + } `json:"bulk_size"` + Execution ExecutionConfig `json:"execution"` + } `json:"settings"` + Creator struct { + Name string `json:"name"` + Id string `json:"id"` + } `json:"creator"` +} + +type ExecutionConfig struct { + TimeWindow []TimeWindowItem `json:"time_window"` + Nodes struct{ + Permit []ExecutionNode `json:"permit"` + } `json:"nodes"` +} + +type ExecutionNode struct { + ID string `json:"id"` + Name string `json:"name"` +} + +type TimeWindowItem struct { + Start string `json:"start"` + End string `json:"end"` +} + +type IndexConfig struct { + Source IndexInfo `json:"source"` + Target IndexInfo `json:"target"` + RawFilter interface{} `json:"raw_filter"` + IndexRename map[string]interface{} `json:"index_rename"` + TypeRename map[string]interface{} `json:"type_rename"` + Partition *IndexPartition `json:"partition,omitempty"` + TaskID string `json:"task_id,omitempty"` + Status string `json:"status,omitempty"` + Percent float64 `json:"percent,omitempty"` + ErrorPartitions int `json:"error_partitions,omitempty"` +} +type IndexPartition struct { + FieldType string `json:"field_type"` + FieldName string `json:"field_name"` + Step interface{} `json:"step"` +} + +type IndexInfo struct { + Name string `json:"name"` + DocType string `json:"doc_type"` + Docs int64 `json:"docs"` + StoreSizeInBytes int `json:"store_size_in_bytes"` +} + +type ClusterInfo struct { + Id string `json:"id"` + Name string `json:"name"` +} \ No newline at end of file diff --git a/plugin/migration/module.go b/plugin/migration/module.go new file mode 100644 index 00000000..ce63e063 --- /dev/null +++ b/plugin/migration/module.go @@ -0,0 +1,31 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package migration + +import ( + "infini.sh/framework/core/module" +) + +func (module *Module) Name() string { + return "migration" +} + +func (module *Module) Setup() { + InitAPI() +} +func (module *Module) Start() error { + return nil +} + +func (module *Module) Stop() error { + return nil +} + +type Module struct { +} + +func init() { + module.RegisterUserPlugin(&Module{}) +} \ No newline at end of file diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go new file mode 100644 index 00000000..039e9ce7 --- /dev/null +++ b/plugin/migration/pipeline.go @@ -0,0 +1,507 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package migration + +import ( + "fmt" + log "github.com/cihub/seelog" + "infini.sh/framework/core/config" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/env" + "infini.sh/framework/core/global" + "infini.sh/framework/core/pipeline" + task2 "infini.sh/framework/core/task" + "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic/common" + "runtime" + "time" +) + +type ClusterMigrationProcessor struct { + id string + config *ClusterMigrationConfig +} + +type ClusterMigrationConfig struct { + Elasticsearch string `config:"elasticsearch,omitempty"` + IndexName string `config:"index_name"` + DetectIntervalInMs int `config:"detect_interval_in_ms"` + LogIndexName string `config:"log_index_name"` +} + +func init() { + pipeline.RegisterProcessorPlugin("cluster_migration", newClusterMigrationProcessor) +} + +func newClusterMigrationProcessor(c *config.Config) (pipeline.Processor, error) { + + cfg := ClusterMigrationConfig{ + DetectIntervalInMs: 5000, + } + if err := c.Unpack(&cfg); err != nil { + log.Error(err) + return nil, fmt.Errorf("failed to unpack the configuration of cluster migration processor: %s", err) + } + if cfg.IndexName == "" || cfg.LogIndexName == "" { + ormConfig := common.ORMConfig{} + ok, err := env.ParseConfig("elastic.orm", &ormConfig) + if ok && err == nil { + if cfg.IndexName == ""{ + cfg.IndexName = fmt.Sprintf("%stask", ormConfig.IndexPrefix) + } + if cfg.LogIndexName == "" { + cfg.LogIndexName = fmt.Sprintf("%stask-log", ormConfig.IndexPrefix) + } + }else{ + err = fmt.Errorf("parse config elastic.orm error: %w", err) + log.Error(err) + return nil, err + } + } + + processor := ClusterMigrationProcessor{ + id: util.GetUUID(), + config: &cfg, + } + + return &processor, nil +} + +func (p *ClusterMigrationProcessor) Name() string { + return "cluster_migration" +} + +func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error { + defer func() { + if !global.Env().IsDebug { + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + log.Errorf("error in %s processor: %v", p.Name(), v) + } + } + log.Tracef("exit %s processor", p.Name()) + }() + + for { + if ctx.IsCanceled() { + return nil + } + tasks, err := p.getClusterMigrationTasks(20) + if err != nil { + panic(err) + } + if len(tasks) == 0 { + log.Debug("got zero cluster migration task from es") + if p.config.DetectIntervalInMs > 0 { + time.Sleep(time.Millisecond * time.Duration(p.config.DetectIntervalInMs)) + } + } + for _, t := range tasks { + if ctx.IsCanceled() { + return nil + } + t.Status = task2.StatusRunning + t.StartTimeInMillis = time.Now().UnixMilli() + p.writeTaskLog(&t, &task2.Log{ + ID: util.GetUUID(), + TaskId: t.ID, + Status: task2.StatusRunning, + Type: t.Metadata.Type, + Action: task2.LogAction{ + Parameters: t.Parameters, + }, + Content: fmt.Sprintf("starting to execute task [%s]", t.ID), + Timestamp: time.Now().UTC(), + }) + err = p.SplitMigrationTask(&t) + taskLog := &task2.Log{ + ID: util.GetUUID(), + TaskId: t.ID, + Status: task2.StatusRunning, + Type: t.Metadata.Type, + Action: task2.LogAction{ + Parameters: t.Parameters, + Result: &task2.LogResult{ + Success: true, + }, + }, + Content: fmt.Sprintf("success to split task [%s]", t.ID), + Timestamp: time.Now().UTC(), + } + if err != nil { + taskLog.Status = task2.StatusError + taskLog.Content = fmt.Sprintf("failed to split task [%s]: %v", t.ID, err) + taskLog.Action.Result = &task2.LogResult{ + Success: false, + Error: err.Error(), + } + } + t.Status = taskLog.Status + p.writeTaskLog(&t, taskLog) + if err != nil { + continue + } + } + //es index refresh + time.Sleep(time.Millisecond * 1200) + } +} + + +func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) error { + if taskItem.Metadata.Labels == nil { + return fmt.Errorf("empty metadata labels, unexpected cluster migration task: %s", util.MustToJSON(taskItem)) + } + if taskItem.Metadata.Labels["pipeline_id"] != p.Name() { + log.Tracef("got unexpect task type of %s with task id [%s] in cluster migration processor", taskItem.Metadata.Type, taskItem.ID) + return nil + } + parameters := util.MapStr(taskItem.Parameters) + migrationConfig, err := parameters.GetValue("pipeline.config") + if err != nil { + return err + } + buf := util.MustToJSONBytes(migrationConfig) + clusterMigrationTask := ElasticDataConfig{} + err = util.FromJSONBytes(buf, &clusterMigrationTask) + if err != nil { + return err + } + defer func() { + parameters.Put("pipeline.config", clusterMigrationTask) + }() + esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) + esTargetClient := elastic.GetClient(clusterMigrationTask.Cluster.Target.Id) + esClient := elastic.GetClient(p.config.Elasticsearch) + + for i, index := range clusterMigrationTask.Indices { + source := util.MapStr{ + "cluster_id": clusterMigrationTask.Cluster.Source.Id, + "indices": index.Source.Name, + //"slice_size": 10, + "batch_size": clusterMigrationTask.Settings.ScrollSize.Docs, + "scroll_time": clusterMigrationTask.Settings.ScrollSize.Timeout, + } + if index.IndexRename != nil { + source["index_rename"] = index.IndexRename + } + if index.Target.Name != "" { + source["index_rename"] = util.MapStr{ + index.Source.Name: index.Target.Name, + } + } + if index.TypeRename != nil { + source["type_rename"] = index.TypeRename + } + + if v, ok := index.RawFilter.(string); ok { + source["query_string"] = v + }else{ + source["query_dsl"] = index.RawFilter + if index.Source.DocType != "" { + if index.Target.DocType != "" { + source["type_rename"] = util.MapStr{ + index.Source.DocType: index.Target.DocType, + } + } + must := []interface{}{ + util.MapStr{ + "terms": util.MapStr{ + "_type": []string{index.Source.DocType}, + }, + }, + } + if index.RawFilter != nil { + must = append(must, index.RawFilter) + } + source["query_dsl"] = util.MapStr{ + "bool": util.MapStr{ + "must": must, + }, + } + }else{ + if esSourceClient.GetMajorVersion() >= 8 { + source["type_rename"] = util.MapStr{ + "*": index.Target.DocType, + } + } + } + } + var targetMust []interface{} + if index.RawFilter != nil { + targetMust = append(targetMust, index.RawFilter) + } + if index.Target.DocType != "" && esTargetClient.GetMajorVersion() < 8 { + targetMust = append(targetMust, util.MapStr{ + "terms": util.MapStr{ + "_type": []string{index.Target.DocType}, + }, + }) + } + + target := util.MapStr{ + "cluster_id": clusterMigrationTask.Cluster.Target.Id, + //"max_worker_size": 10, + //"detect_interval": 100, + "bulk": util.MapStr{ + "batch_size_in_mb": clusterMigrationTask.Settings.BulkSize.StoreSizeInMB, + "batch_size_in_docs": clusterMigrationTask.Settings.BulkSize.Docs, + }, + } + indexParameters := map[string]interface{}{ + "pipeline": util.MapStr{ + "id": "index_migration", + "config": util.MapStr{ + "source": source, + "target": target, + "execution": clusterMigrationTask.Settings.Execution, + }, + }, + } + indexMigrationTask := task2.Task{ + ParentId: []string{taskItem.ID}, + Cancellable: true, + Runnable: false, + Status: task2.StatusRunning, + StartTimeInMillis: time.Now().UnixMilli(), + Metadata: task2.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "pipeline_id": "index_migration", + "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, + "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, + "level": "index", + "partition_count": 1, + }, + }, + Parameters: indexParameters, + } + + indexMigrationTask.ID=util.GetUUID() + + clusterMigrationTask.Indices[i].TaskID = indexMigrationTask.ID + if index.Partition != nil { + partitionQ := &elastic.PartitionQuery{ + IndexName: index.Source.Name, + FieldName: index.Partition.FieldName, + FieldType: index.Partition.FieldType, + Step: index.Partition.Step, + //Filter: index.RawFilter, + Filter: source["query_dsl"], + } + partitions, err := elastic.GetPartitions(partitionQ, esSourceClient) + if err != nil { + return err + } + if partitions == nil || len(partitions) == 0{ + return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter)) + } + var ( + partitionID int + ) + for _, partition := range partitions { + //skip empty partition + if partition.Docs <= 0 { + continue + } + partitionID++ + partitionSource := util.MapStr{ + "start": partition.Start, + "end": partition.End, + "doc_count": partition.Docs, + "step": index.Partition.Step, + "partition_id": partitionID, + } + for k, v := range source{ + if k == "query_string"{ + continue + } + partitionSource[k] = v + } + partitionSource["query_dsl"] = partition.Filter + var must []interface{} + + if partition.Other { + must = append(must, partition.Filter) + }else{ + must = append(must, util.MapStr{ + "range": util.MapStr{ + index.Partition.FieldName: util.MapStr{ + "gte": partition.Start, + "lt": partition.End, + }, + }, + }) + } + + if targetMust != nil { + must = append(must, targetMust...) + } + if len(must) > 0 { + target["query_dsl"] = util.MapStr{ + "bool": util.MapStr{ + "must": must, + }, + } + } + + partitionMigrationTask := task2.Task{ + ParentId: []string{taskItem.ID, indexMigrationTask.ID}, + Cancellable: false, + Runnable: true, + Status: task2.StatusReady, + Metadata: task2.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "pipeline_id": "index_migration", + "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, + "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, + "level": "partition", + "index_name": index.Source.Name, + "execution": util.MapStr{ + "nodes": util.MapStr{ + "permit": clusterMigrationTask.Settings.Execution.Nodes.Permit, + }, + }, + }, + }, + Parameters: map[string]interface{}{ + "pipeline": util.MapStr{ + "id": "index_migration", + "config": util.MapStr{ + "source": partitionSource, + "target": target, + "execution": clusterMigrationTask.Settings.Execution, + }, + }, + }, + } + partitionMigrationTask.ID=util.GetUUID() + + _, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "") + delete(target, "query_dsl") + if err != nil { + return fmt.Errorf("store index migration task(partition) error: %w", err) + } + + } + indexMigrationTask.Metadata.Labels["partition_count"] = partitionID + }else{ + source["doc_count"] = index.Source.Docs + if targetMust != nil { + target["query_dsl"] = util.MapStr{ + "bool": util.MapStr{ + "must": targetMust, + }, + } + } + partitionMigrationTask := task2.Task{ + ParentId: []string{taskItem.ID, indexMigrationTask.ID}, + Cancellable: false, + Runnable: true, + Status: task2.StatusReady, + Metadata: task2.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "pipeline_id": "index_migration", + "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, + "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, + "level": "partition", + "index_name": index.Source.Name, + "execution": util.MapStr{ + "nodes": util.MapStr{ + "permit": clusterMigrationTask.Settings.Execution.Nodes.Permit, + }, + }, + }, + }, + Parameters: indexParameters, + } + partitionMigrationTask.ID=util.GetUUID() + + _, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "") + delete(target, "query_dsl") + if err != nil { + return fmt.Errorf("store index migration task(partition) error: %w", err) + } + } + _, err = esClient.Index(p.config.IndexName, "", indexMigrationTask.ID, indexMigrationTask, "") + if err != nil { + return fmt.Errorf("store index migration task error: %w", err) + } + } + return nil +} + +func (p *ClusterMigrationProcessor) getClusterMigrationTasks(size int)([]task2.Task, error){ + queryDsl := util.MapStr{ + "size": size, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "asc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "status": task2.StatusReady, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": p.Name(), + }, + }, + }, + }, + }, + } + esClient := elastic.GetClient(p.config.Elasticsearch) + res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(queryDsl)) + if err != nil { + return nil, err + } + if res.GetTotal() == 0 { + return nil, nil + } + var migrationTasks []task2.Task + for _, hit := range res.Hits.Hits { + buf, err := util.ToJSONBytes(hit.Source) + if err != nil { + return nil, err + } + tk := task2.Task{} + err = util.FromJSONBytes(buf, &tk) + if err != nil { + return nil, err + } + migrationTasks = append(migrationTasks, tk) + } + return migrationTasks, nil +} + +func (p *ClusterMigrationProcessor) writeTaskLog(taskItem *task2.Task, logItem *task2.Log) { + esClient := elastic.GetClient(p.config.Elasticsearch) + _, err := esClient.Index(p.config.IndexName,"", logItem.TaskId, taskItem, "" ) + if err != nil{ + log.Error(err) + } + _, err = esClient.Index(p.config.LogIndexName,"", logItem.ID, logItem, "" ) + if err != nil{ + log.Error(err) + } +}