refactoring pipelines
This commit is contained in:
parent
f13e159d73
commit
02adb4d121
|
@ -127,58 +127,6 @@ PUT $[[INDEX_PREFIX]]metrics-00001
|
|||
}
|
||||
],
|
||||
"properties": {
|
||||
"metadata": {
|
||||
"properties": {
|
||||
"category": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
},
|
||||
"datatype": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
},
|
||||
"labels": {
|
||||
"properties": {
|
||||
"cluster_id": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
},
|
||||
"index_id": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
},
|
||||
"index_name": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
},
|
||||
"index_uuid": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
},
|
||||
"ip": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
},
|
||||
"node_id": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
},
|
||||
"node_name": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
},
|
||||
"transport_address": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"name": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"timestamp": {
|
||||
"type": "date"
|
||||
}
|
||||
|
@ -243,6 +191,145 @@ PUT $[[INDEX_PREFIX]]logs-00001
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
PUT _template/$[[INDEX_PREFIX]]requests_logging-rollover
|
||||
{
|
||||
"order": 100000,
|
||||
"index_patterns": [
|
||||
"$[[INDEX_PREFIX]]requests_logging*"
|
||||
],
|
||||
"settings": {
|
||||
"index": {
|
||||
"format": "7",
|
||||
"lifecycle": {
|
||||
"name" : "ilm_$[[INDEX_PREFIX]]metrics-30days-retention",
|
||||
"rollover_alias" : "$[[INDEX_PREFIX]]requests_logging"
|
||||
},
|
||||
"codec": "best_compression",
|
||||
"number_of_shards": "1",
|
||||
"translog": {
|
||||
"durability": "async"
|
||||
}
|
||||
}
|
||||
},
|
||||
"mappings": {
|
||||
"dynamic_templates": [
|
||||
{
|
||||
"strings": {
|
||||
"mapping": {
|
||||
"ignore_above": 256,
|
||||
"type": "keyword"
|
||||
},
|
||||
"match_mapping_type": "string"
|
||||
}
|
||||
}
|
||||
],
|
||||
"properties": {
|
||||
"request": {
|
||||
"properties": {
|
||||
"body": {
|
||||
"type": "text"
|
||||
}
|
||||
}
|
||||
},
|
||||
"response": {
|
||||
"properties": {
|
||||
"body": {
|
||||
"type": "text"
|
||||
}
|
||||
}
|
||||
},
|
||||
"timestamp": {
|
||||
"type": "date"
|
||||
}
|
||||
}
|
||||
},
|
||||
"aliases": {}
|
||||
}
|
||||
|
||||
PUT $[[INDEX_PREFIX]]requests_logging-00001
|
||||
{
|
||||
"settings": {
|
||||
"index.lifecycle.rollover_alias":"$[[INDEX_PREFIX]]requests_logging"
|
||||
, "refresh_interval": "5s"
|
||||
},
|
||||
"aliases":{
|
||||
"$[[INDEX_PREFIX]]requests_logging":{
|
||||
"is_write_index":true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
PUT _template/$[[INDEX_PREFIX]]async_bulk_results-rollover
|
||||
{
|
||||
"order": 100000,
|
||||
"index_patterns": [
|
||||
"$[[INDEX_PREFIX]]async_bulk_results*"
|
||||
],
|
||||
"settings": {
|
||||
"index": {
|
||||
"format": "7",
|
||||
"lifecycle": {
|
||||
"name" : "ilm_$[[INDEX_PREFIX]]metrics-30days-retention",
|
||||
"rollover_alias" : "$[[INDEX_PREFIX]]async_bulk_results"
|
||||
},
|
||||
"codec": "best_compression",
|
||||
"number_of_shards": "1",
|
||||
"translog": {
|
||||
"durability": "async"
|
||||
}
|
||||
}
|
||||
},
|
||||
"mappings": {
|
||||
"dynamic_templates": [
|
||||
{
|
||||
"strings": {
|
||||
"mapping": {
|
||||
"ignore_above": 256,
|
||||
"type": "keyword"
|
||||
},
|
||||
"match_mapping_type": "string"
|
||||
}
|
||||
}
|
||||
],
|
||||
"properties": {
|
||||
"request": {
|
||||
"properties": {
|
||||
"body": {
|
||||
"type": "text"
|
||||
}
|
||||
}
|
||||
},
|
||||
"response": {
|
||||
"properties": {
|
||||
"body": {
|
||||
"type": "text"
|
||||
}
|
||||
}
|
||||
},
|
||||
"timestamp": {
|
||||
"type": "date"
|
||||
}
|
||||
}
|
||||
},
|
||||
"aliases": {}
|
||||
}
|
||||
|
||||
PUT $[[INDEX_PREFIX]]async_bulk_results-00001
|
||||
{
|
||||
"settings": {
|
||||
"index.lifecycle.rollover_alias":"$[[INDEX_PREFIX]]async_bulk_results"
|
||||
, "refresh_interval": "5s"
|
||||
},
|
||||
"aliases":{
|
||||
"$[[INDEX_PREFIX]]async_bulk_results":{
|
||||
"is_write_index":true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
PUT _template/$[[INDEX_PREFIX]]alert-history-rollover
|
||||
{
|
||||
"order" : 100000,
|
||||
|
@ -450,7 +537,8 @@ PUT $[[INDEX_PREFIX]]activities-00001
|
|||
],
|
||||
"properties": {
|
||||
"changelog": {
|
||||
"type": "flattened"
|
||||
"type": "object",
|
||||
"enabled": false
|
||||
},
|
||||
"id": {
|
||||
"type": "keyword"
|
||||
|
@ -465,9 +553,6 @@ PUT $[[INDEX_PREFIX]]activities-00001
|
|||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
},
|
||||
"labels": {
|
||||
"type": "flattened"
|
||||
},
|
||||
"name": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
|
|
7
main.go
7
main.go
|
@ -6,8 +6,8 @@ import (
|
|||
_ "expvar"
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/console/config"
|
||||
"infini.sh/console/model"
|
||||
"infini.sh/console/model/alerting"
|
||||
"infini.sh/console/model/gateway"
|
||||
_ "infini.sh/console/plugin"
|
||||
alerting2 "infini.sh/console/service/alerting"
|
||||
"infini.sh/framework"
|
||||
|
@ -23,7 +23,6 @@ import (
|
|||
_ "infini.sh/framework/modules/api"
|
||||
elastic2 "infini.sh/framework/modules/elastic"
|
||||
"infini.sh/framework/modules/metrics"
|
||||
"infini.sh/framework/modules/migration"
|
||||
"infini.sh/framework/modules/pipeline"
|
||||
queue2 "infini.sh/framework/modules/queue/disk_queue"
|
||||
"infini.sh/framework/modules/redis"
|
||||
|
@ -71,7 +70,6 @@ func main() {
|
|||
modules=append(modules,&agent.AgentModule{})
|
||||
modules=append(modules,&metrics.MetricsModule{})
|
||||
modules=append(modules,&security.Module{})
|
||||
modules=append(modules,&migration.MigrationModule{})
|
||||
|
||||
uiModule:=&ui.UIModule{}
|
||||
|
||||
|
@ -91,7 +89,6 @@ func main() {
|
|||
module.RegisterSystemModule(&agent.AgentModule{})
|
||||
module.RegisterSystemModule(&metrics.MetricsModule{})
|
||||
module.RegisterSystemModule(&security.Module{})
|
||||
module.RegisterSystemModule(&migration.MigrationModule{})
|
||||
}else{
|
||||
for _, v := range modules {
|
||||
v.Setup()
|
||||
|
@ -133,7 +130,7 @@ func main() {
|
|||
orm.RegisterSchemaWithIndexName(elastic.View{}, "view")
|
||||
orm.RegisterSchemaWithIndexName(elastic.CommonCommand{}, "commands")
|
||||
//orm.RegisterSchemaWithIndexName(elastic.TraceTemplate{}, "trace-template")
|
||||
orm.RegisterSchemaWithIndexName(gateway.Instance{}, "gateway-instance")
|
||||
orm.RegisterSchemaWithIndexName(model.Instance{}, "instance")
|
||||
orm.RegisterSchemaWithIndexName(alerting.Rule{}, "alert-rule")
|
||||
orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history")
|
||||
orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message")
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
* web: https://infinilabs.com
|
||||
* mail: hello#infini.ltd */
|
||||
|
||||
package gateway
|
||||
package model
|
||||
|
||||
import (
|
||||
"infini.sh/framework/core/agent"
|
|
@ -9,7 +9,7 @@ import (
|
|||
"fmt"
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/segmentio/encoding/json"
|
||||
"infini.sh/console/model/gateway"
|
||||
"infini.sh/console/model"
|
||||
"infini.sh/framework/core/agent"
|
||||
httprouter "infini.sh/framework/core/api/router"
|
||||
elastic2 "infini.sh/framework/core/elastic"
|
||||
|
@ -25,7 +25,7 @@ import (
|
|||
)
|
||||
|
||||
func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
var obj = &gateway.Instance{}
|
||||
var obj = &model.Instance{}
|
||||
err := h.DecodeJSON(req, obj)
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -68,7 +68,7 @@ func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps
|
|||
func (h *GatewayAPI) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
id := ps.MustGetParameter("instance_id")
|
||||
|
||||
obj := gateway.Instance{}
|
||||
obj := model.Instance{}
|
||||
obj.ID = id
|
||||
|
||||
exists, err := orm.Get(&obj)
|
||||
|
@ -94,7 +94,7 @@ func (h *GatewayAPI) getInstance(w http.ResponseWriter, req *http.Request, ps ht
|
|||
|
||||
func (h *GatewayAPI) updateInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
id := ps.MustGetParameter("instance_id")
|
||||
obj := gateway.Instance{}
|
||||
obj := model.Instance{}
|
||||
|
||||
obj.ID = id
|
||||
exists, err := orm.Get(&obj)
|
||||
|
@ -108,7 +108,7 @@ func (h *GatewayAPI) updateInstance(w http.ResponseWriter, req *http.Request, ps
|
|||
|
||||
id = obj.ID
|
||||
create := obj.Created
|
||||
obj = gateway.Instance{}
|
||||
obj = model.Instance{}
|
||||
err = h.DecodeJSON(req, &obj)
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -135,7 +135,7 @@ func (h *GatewayAPI) updateInstance(w http.ResponseWriter, req *http.Request, ps
|
|||
func (h *GatewayAPI) deleteInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
id := ps.MustGetParameter("instance_id")
|
||||
|
||||
obj := gateway.Instance{}
|
||||
obj := model.Instance{}
|
||||
obj.ID = id
|
||||
|
||||
exists, err := orm.Get(&obj)
|
||||
|
@ -185,7 +185,7 @@ func (h *GatewayAPI) searchInstance(w http.ResponseWriter, req *http.Request, ps
|
|||
queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from)
|
||||
q.RawQuery = []byte(queryDSL)
|
||||
|
||||
err, res := orm.Search(&gateway.Instance{}, &q)
|
||||
err, res := orm.Search(&model.Instance{}, &q)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -216,7 +216,7 @@ func (h *GatewayAPI) getInstanceStatus(w http.ResponseWriter, req *http.Request,
|
|||
}
|
||||
q.RawQuery = util.MustToJSONBytes(queryDSL)
|
||||
|
||||
err, res := orm.Search(&gateway.Instance{}, &q)
|
||||
err, res := orm.Search(&model.Instance{}, &q)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -272,7 +272,7 @@ func (h *GatewayAPI) proxy(w http.ResponseWriter, req *http.Request, ps httprout
|
|||
)
|
||||
instanceID := ps.MustGetParameter("instance_id")
|
||||
|
||||
obj := gateway.Instance{}
|
||||
obj := model.Instance{}
|
||||
obj.ID = instanceID
|
||||
|
||||
exists, err := orm.Get(&obj)
|
||||
|
@ -369,7 +369,7 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request,
|
|||
from = 0
|
||||
}
|
||||
agentIndexName := orm.GetIndexName(agent.Instance{})
|
||||
gatewayIndexName := orm.GetIndexName(gateway.Instance{})
|
||||
gatewayIndexName := orm.GetIndexName(model.Instance{})
|
||||
agentMust := []util.MapStr{
|
||||
{
|
||||
"term": util.MapStr{
|
||||
|
@ -443,18 +443,19 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request,
|
|||
}
|
||||
err, result := orm.Search(nil, &q)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
searchRes := elastic2.SearchResponse{}
|
||||
err = util.FromJSONBytes(result.Raw, &searchRes)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
if err != nil||searchRes.ESError!=nil {
|
||||
msg:=fmt.Sprintf("%v,%v",err,searchRes.ESError)
|
||||
h.WriteError(w, msg, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var nodes = []util.MapStr{}
|
||||
|
||||
for _, hit := range searchRes.Hits.Hits {
|
||||
var (
|
||||
endpoint string
|
||||
|
@ -484,7 +485,7 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request,
|
|||
}
|
||||
|
||||
if !hasErr {
|
||||
available, err := isNodeAvailable(endpoint)
|
||||
available, err := isNodeAvailable(endpoint) //TODO remove
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,332 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* web: https://infinilabs.com
|
||||
* mail: hello#infini.ltd */
|
||||
|
||||
package elastic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/buger/jsonparser"
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/segmentio/encoding/json"
|
||||
"infini.sh/framework/core/config"
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/event"
|
||||
"infini.sh/framework/core/global"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/pipeline"
|
||||
"infini.sh/framework/core/queue"
|
||||
"infini.sh/framework/core/rotate"
|
||||
"infini.sh/framework/core/util"
|
||||
"runtime"
|
||||
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ActivityProcessor struct {
|
||||
config *Config
|
||||
runningConfigs map[string]*queue.QueueConfig
|
||||
bulkSizeInByte int
|
||||
wg sync.WaitGroup
|
||||
inFlightQueueConfigs sync.Map
|
||||
detectorRunning bool
|
||||
id string
|
||||
}
|
||||
|
||||
func init() {
|
||||
pipeline.RegisterProcessorPlugin("activity", NewActivityProcessor)
|
||||
}
|
||||
|
||||
func NewActivityProcessor(c *config.Config) (pipeline.Processor, error) {
|
||||
cfg := Config{
|
||||
NumOfWorkers: 1,
|
||||
MaxWorkers: 10,
|
||||
MaxConnectionPerHost: 1,
|
||||
IdleTimeoutInSecond: 5,
|
||||
BulkSizeInMb: 10,
|
||||
DetectIntervalInMs: 10000,
|
||||
Queues: map[string]interface{}{},
|
||||
|
||||
Consumer: queue.ConsumerConfig{
|
||||
Group: "activity-001",
|
||||
Name: "activity-001",
|
||||
FetchMinBytes: 1,
|
||||
FetchMaxBytes: 10 * 1024 * 1024,
|
||||
FetchMaxMessages: 500,
|
||||
EOFRetryDelayInMs: 1000,
|
||||
FetchMaxWaitMs: 10000,
|
||||
},
|
||||
|
||||
DetectActiveQueue: true,
|
||||
ValidateRequest: false,
|
||||
SkipEmptyQueue: true,
|
||||
SkipOnMissingInfo: false,
|
||||
RotateConfig: rotate.DefaultConfig,
|
||||
BulkConfig: elastic.DefaultBulkProcessorConfig,
|
||||
}
|
||||
|
||||
if err := c.Unpack(&cfg); err != nil {
|
||||
log.Error(err)
|
||||
return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err)
|
||||
}
|
||||
|
||||
runner := ActivityProcessor{
|
||||
id: util.GetUUID(),
|
||||
config: &cfg,
|
||||
runningConfigs: map[string]*queue.QueueConfig{},
|
||||
inFlightQueueConfigs: sync.Map{},
|
||||
}
|
||||
|
||||
runner.bulkSizeInByte = 1048576 * runner.config.BulkSizeInMb
|
||||
if runner.config.BulkSizeInKb > 0 {
|
||||
runner.bulkSizeInByte = 1024 * runner.config.BulkSizeInKb
|
||||
}
|
||||
|
||||
runner.wg = sync.WaitGroup{}
|
||||
|
||||
return &runner, nil
|
||||
}
|
||||
|
||||
func (processor *ActivityProcessor) Name() string {
|
||||
return "activity"
|
||||
}
|
||||
|
||||
func (processor *ActivityProcessor) Process(c *pipeline.Context) error {
|
||||
defer func() {
|
||||
if !global.Env().IsDebug {
|
||||
if r := recover(); r != nil {
|
||||
var v string
|
||||
switch r.(type) {
|
||||
case error:
|
||||
v = r.(error).Error()
|
||||
case runtime.Error:
|
||||
v = r.(runtime.Error).Error()
|
||||
case string:
|
||||
v = r.(string)
|
||||
}
|
||||
log.Error("error in activity processor,", v)
|
||||
}
|
||||
}
|
||||
log.Trace("exit activity processor")
|
||||
}()
|
||||
|
||||
//handle updates
|
||||
if processor.config.DetectActiveQueue {
|
||||
log.Tracef("detector running [%v]", processor.detectorRunning)
|
||||
if !processor.detectorRunning {
|
||||
processor.detectorRunning = true
|
||||
processor.wg.Add(1)
|
||||
go func(c *pipeline.Context) {
|
||||
log.Tracef("init detector for active queue [%v] ", processor.id)
|
||||
defer func() {
|
||||
if !global.Env().IsDebug {
|
||||
if r := recover(); r != nil {
|
||||
var v string
|
||||
switch r.(type) {
|
||||
case error:
|
||||
v = r.(error).Error()
|
||||
case runtime.Error:
|
||||
v = r.(runtime.Error).Error()
|
||||
case string:
|
||||
v = r.(string)
|
||||
}
|
||||
log.Error("error in activity processor,", v)
|
||||
}
|
||||
}
|
||||
processor.detectorRunning = false
|
||||
log.Debug("exit detector for active queue")
|
||||
processor.wg.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
if c.IsCanceled() {
|
||||
return
|
||||
}
|
||||
|
||||
if global.Env().IsDebug {
|
||||
log.Tracef("inflight queues: %v", util.MapLength(&processor.inFlightQueueConfigs))
|
||||
processor.inFlightQueueConfigs.Range(func(key, value interface{}) bool {
|
||||
log.Tracef("inflight queue:%v", key)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
cfgs := queue.GetConfigByLabels(processor.config.Queues)
|
||||
for _, v := range cfgs {
|
||||
if c.IsCanceled() {
|
||||
return
|
||||
}
|
||||
//if have depth and not in in flight
|
||||
if queue.HasLag(v) {
|
||||
_, ok := processor.inFlightQueueConfigs.Load(v.Id)
|
||||
if !ok {
|
||||
log.Tracef("detecting new queue: %v", v.Name)
|
||||
processor.HandleQueueConfig(v, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
if processor.config.DetectIntervalInMs > 0 {
|
||||
time.Sleep(time.Millisecond * time.Duration(processor.config.DetectIntervalInMs))
|
||||
}
|
||||
}
|
||||
}(c)
|
||||
}
|
||||
} else {
|
||||
cfgs := queue.GetConfigByLabels(processor.config.Queues)
|
||||
log.Debugf("filter queue by:%v, num of queues:%v", processor.config.Queues, len(cfgs))
|
||||
for _, v := range cfgs {
|
||||
log.Tracef("checking queue: %v", v)
|
||||
processor.HandleQueueConfig(v, c)
|
||||
}
|
||||
}
|
||||
|
||||
processor.wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (processor *ActivityProcessor) HandleQueueConfig(v *queue.QueueConfig, c *pipeline.Context) {
|
||||
|
||||
if processor.config.SkipEmptyQueue {
|
||||
if !queue.HasLag(v) {
|
||||
if global.Env().IsDebug {
|
||||
log.Tracef("skip empty queue:[%v]", v.Name)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
elasticsearch := processor.config.Elasticsearch
|
||||
if elasticsearch == "" {
|
||||
log.Error("elasticsearch config was not found in activity processor")
|
||||
return
|
||||
}
|
||||
|
||||
meta := elastic.GetMetadata(util.ToString(elasticsearch))
|
||||
if meta == nil {
|
||||
log.Debugf("metadata for [%v] is nil", elasticsearch)
|
||||
return
|
||||
}
|
||||
|
||||
host := meta.GetActiveHost()
|
||||
log.Debugf("random choose node [%v] to consume queue [%v]", host, v.Id)
|
||||
processor.wg.Add(1)
|
||||
|
||||
//go processor.NewBulkWorker("bulk_indexing_"+host,c, processor.bulkSizeInByte, v, host)
|
||||
go processor.HandleMessage(c, v)
|
||||
|
||||
}
|
||||
|
||||
func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig *queue.QueueConfig) {
|
||||
defer func() {
|
||||
if !global.Env().IsDebug {
|
||||
if r := recover(); r != nil {
|
||||
var v string
|
||||
switch r.(type) {
|
||||
case error:
|
||||
v = r.(error).Error()
|
||||
case runtime.Error:
|
||||
v = r.(runtime.Error).Error()
|
||||
case string:
|
||||
v = r.(string)
|
||||
}
|
||||
log.Errorf("error in %s processor: %v", processor.Name(), v)
|
||||
}
|
||||
}
|
||||
processor.wg.Done()
|
||||
log.Tracef("exit %s processor", processor.Name())
|
||||
}()
|
||||
|
||||
key := qConfig.Id
|
||||
|
||||
if processor.config.MaxWorkers > 0 && util.MapLength(&processor.inFlightQueueConfigs) > processor.config.MaxWorkers {
|
||||
log.Debugf("reached max num of workers, skip init [%v]", qConfig.Name)
|
||||
return
|
||||
}
|
||||
|
||||
var workerID = util.GetUUID()
|
||||
_, exists := processor.inFlightQueueConfigs.Load(key)
|
||||
if exists {
|
||||
log.Errorf("queue [%v] has more then one consumer", qConfig.Id)
|
||||
return
|
||||
}
|
||||
|
||||
processor.inFlightQueueConfigs.Store(key, workerID)
|
||||
log.Debugf("starting worker:[%v], queue:[%v]", workerID, qConfig.Name)
|
||||
var consumer = queue.GetOrInitConsumerConfig(qConfig.Id, processor.config.Consumer.Group, processor.config.Consumer.Name)
|
||||
initOffset, _ := queue.GetOffset(qConfig, consumer)
|
||||
offset := initOffset
|
||||
defer func() {
|
||||
log.Debugf("worker:[%v] start consume queue:[%v] offset:%v", workerID, qConfig.Id, offset)
|
||||
}()
|
||||
|
||||
for {
|
||||
if ctx.IsCanceled() {
|
||||
return
|
||||
}
|
||||
|
||||
ctx1, messages, timeout, err := queue.Consume(qConfig, consumer, offset)
|
||||
|
||||
if timeout {
|
||||
log.Tracef("timeout on queue:[%v]", qConfig.Name)
|
||||
ctx.Failed()
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Tracef("error on queue:[%v]", qConfig.Name)
|
||||
if err.Error() == "EOF" {
|
||||
if len(messages) > 0 {
|
||||
goto HANDLE_MESSAGE
|
||||
}
|
||||
return
|
||||
}
|
||||
panic(err)
|
||||
}
|
||||
|
||||
HANDLE_MESSAGE:
|
||||
|
||||
//update temp offset, not committed, continued reading
|
||||
offset = ctx1.NextOffset.String()//TODO
|
||||
|
||||
if len(messages) > 0 {
|
||||
for _, pop := range messages {
|
||||
typ, err := jsonparser.GetString(pop.Data, "metadata", "name")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
switch typ {
|
||||
case "activity":
|
||||
activity, _, _, err := jsonparser.Get(pop.Data, "payload", "activity")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = processor.HandleActivity(activity)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
if offset != "" && initOffset != offset {
|
||||
ok, err := queue.CommitOffset(qConfig, consumer, offset)
|
||||
if !ok || err != nil {
|
||||
panic(err)
|
||||
}
|
||||
initOffset=offset
|
||||
}
|
||||
} else {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (processor *ActivityProcessor) HandleActivity(activityByte []byte) error {
|
||||
// save activity
|
||||
activityInfo := &event.Activity{}
|
||||
json.Unmarshal(activityByte, activityInfo)
|
||||
esClient := elastic.GetClient(processor.config.Elasticsearch)
|
||||
_, err := esClient.Index(orm.GetIndexName(activityInfo), "", activityInfo.ID, activityInfo, "")
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,463 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* web: https://infinilabs.com
|
||||
* mail: hello#infini.ltd */
|
||||
|
||||
package elastic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/buger/jsonparser"
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/framework/core/config"
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/event"
|
||||
"infini.sh/framework/core/global"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/pipeline"
|
||||
"infini.sh/framework/core/queue"
|
||||
"infini.sh/framework/core/rotate"
|
||||
"infini.sh/framework/core/util"
|
||||
"runtime"
|
||||
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type MetadataProcessor struct {
|
||||
config *Config
|
||||
runningConfigs map[string]*queue.QueueConfig
|
||||
bulkSizeInByte int
|
||||
wg sync.WaitGroup
|
||||
inFlightQueueConfigs sync.Map
|
||||
detectorRunning bool
|
||||
id string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
NumOfWorkers int `config:"worker_size"`
|
||||
|
||||
IdleTimeoutInSecond int `config:"idle_timeout_in_seconds"`
|
||||
MaxConnectionPerHost int `config:"max_connection_per_node"`
|
||||
|
||||
BulkSizeInKb int `config:"bulk_size_in_kb,omitempty"`
|
||||
BulkSizeInMb int `config:"bulk_size_in_mb,omitempty"`
|
||||
BulkMaxDocsCount int `config:"bulk_max_docs_count,omitempty"`
|
||||
|
||||
Queues map[string]interface{} `config:"queues,omitempty"`
|
||||
|
||||
Consumer queue.ConsumerConfig `config:"consumer"`
|
||||
|
||||
MaxWorkers int `config:"max_worker_size"`
|
||||
|
||||
DetectActiveQueue bool `config:"detect_active_queue"`
|
||||
DetectIntervalInMs int `config:"detect_interval"`
|
||||
|
||||
ValidateRequest bool `config:"valid_request"`
|
||||
SkipEmptyQueue bool `config:"skip_empty_queue"`
|
||||
SkipOnMissingInfo bool `config:"skip_info_missing"`
|
||||
|
||||
RotateConfig rotate.RotateConfig `config:"rotate"`
|
||||
BulkConfig elastic.BulkProcessorConfig `config:"bulk"`
|
||||
|
||||
Elasticsearch string `config:"elasticsearch,omitempty"`
|
||||
|
||||
WaitingAfter []string `config:"waiting_after"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
pipeline.RegisterProcessorPlugin("metadata", New)
|
||||
}
|
||||
|
||||
func New(c *config.Config) (pipeline.Processor, error) {
|
||||
cfg := Config{
|
||||
NumOfWorkers: 1,
|
||||
MaxWorkers: 10,
|
||||
MaxConnectionPerHost: 1,
|
||||
IdleTimeoutInSecond: 5,
|
||||
BulkSizeInMb: 10,
|
||||
DetectIntervalInMs: 10000,
|
||||
Queues: map[string]interface{}{},
|
||||
|
||||
Consumer: queue.ConsumerConfig{
|
||||
Group: "metadata-001",
|
||||
Name: "metadata-001",
|
||||
FetchMinBytes: 1,
|
||||
FetchMaxBytes: 10 * 1024 * 1024,
|
||||
FetchMaxMessages: 500,
|
||||
EOFRetryDelayInMs: 1000,
|
||||
FetchMaxWaitMs: 10000,
|
||||
},
|
||||
|
||||
DetectActiveQueue: true,
|
||||
ValidateRequest: false,
|
||||
SkipEmptyQueue: true,
|
||||
SkipOnMissingInfo: false,
|
||||
RotateConfig: rotate.DefaultConfig,
|
||||
BulkConfig: elastic.DefaultBulkProcessorConfig,
|
||||
}
|
||||
|
||||
if err := c.Unpack(&cfg); err != nil {
|
||||
log.Error(err)
|
||||
return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err)
|
||||
}
|
||||
|
||||
runner := MetadataProcessor{
|
||||
id: util.GetUUID(),
|
||||
config: &cfg,
|
||||
runningConfigs: map[string]*queue.QueueConfig{},
|
||||
inFlightQueueConfigs: sync.Map{},
|
||||
}
|
||||
|
||||
runner.bulkSizeInByte = 1048576 * runner.config.BulkSizeInMb
|
||||
if runner.config.BulkSizeInKb > 0 {
|
||||
runner.bulkSizeInByte = 1024 * runner.config.BulkSizeInKb
|
||||
}
|
||||
|
||||
runner.wg = sync.WaitGroup{}
|
||||
|
||||
return &runner, nil
|
||||
}
|
||||
|
||||
func (processor *MetadataProcessor) Name() string {
|
||||
return "metadata"
|
||||
}
|
||||
|
||||
func (processor *MetadataProcessor) Process(c *pipeline.Context) error {
|
||||
defer func() {
|
||||
if !global.Env().IsDebug {
|
||||
if r := recover(); r != nil {
|
||||
var v string
|
||||
switch r.(type) {
|
||||
case error:
|
||||
v = r.(error).Error()
|
||||
case runtime.Error:
|
||||
v = r.(runtime.Error).Error()
|
||||
case string:
|
||||
v = r.(string)
|
||||
}
|
||||
log.Error("error in metadata processor,", v)
|
||||
}
|
||||
}
|
||||
log.Trace("exit metadata processor")
|
||||
}()
|
||||
|
||||
//handle updates
|
||||
if processor.config.DetectActiveQueue {
|
||||
log.Tracef("detector running [%v]", processor.detectorRunning)
|
||||
if !processor.detectorRunning {
|
||||
processor.detectorRunning = true
|
||||
processor.wg.Add(1)
|
||||
go func(c *pipeline.Context) {
|
||||
log.Tracef("init detector for active queue [%v] ", processor.id)
|
||||
defer func() {
|
||||
if !global.Env().IsDebug {
|
||||
if r := recover(); r != nil {
|
||||
var v string
|
||||
switch r.(type) {
|
||||
case error:
|
||||
v = r.(error).Error()
|
||||
case runtime.Error:
|
||||
v = r.(runtime.Error).Error()
|
||||
case string:
|
||||
v = r.(string)
|
||||
}
|
||||
log.Error("error in metadata processor,", v)
|
||||
}
|
||||
}
|
||||
processor.detectorRunning = false
|
||||
log.Debug("exit detector for active queue")
|
||||
processor.wg.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
if c.IsCanceled() {
|
||||
return
|
||||
}
|
||||
|
||||
if global.Env().IsDebug {
|
||||
log.Tracef("inflight queues: %v", util.MapLength(&processor.inFlightQueueConfigs))
|
||||
processor.inFlightQueueConfigs.Range(func(key, value interface{}) bool {
|
||||
log.Tracef("inflight queue:%v", key)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
cfgs := queue.GetConfigByLabels(processor.config.Queues)
|
||||
for _, v := range cfgs {
|
||||
if c.IsCanceled() {
|
||||
return
|
||||
}
|
||||
//if have depth and not in in flight
|
||||
if queue.HasLag(v) {
|
||||
_, ok := processor.inFlightQueueConfigs.Load(v.Id)
|
||||
if !ok {
|
||||
log.Tracef("detecting new queue: %v", v.Name)
|
||||
processor.HandleQueueConfig(v, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
if processor.config.DetectIntervalInMs > 0 {
|
||||
time.Sleep(time.Millisecond * time.Duration(processor.config.DetectIntervalInMs))
|
||||
}
|
||||
}
|
||||
}(c)
|
||||
}
|
||||
} else {
|
||||
cfgs := queue.GetConfigByLabels(processor.config.Queues)
|
||||
log.Debugf("filter queue by:%v, num of queues:%v", processor.config.Queues, len(cfgs))
|
||||
for _, v := range cfgs {
|
||||
log.Tracef("checking queue: %v", v)
|
||||
processor.HandleQueueConfig(v, c)
|
||||
}
|
||||
}
|
||||
|
||||
processor.wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (processor *MetadataProcessor) HandleQueueConfig(v *queue.QueueConfig, c *pipeline.Context) {
|
||||
|
||||
if processor.config.SkipEmptyQueue {
|
||||
if !queue.HasLag(v) {
|
||||
if global.Env().IsDebug {
|
||||
log.Tracef("skip empty queue:[%v]", v.Name)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
elasticsearch := processor.config.Elasticsearch
|
||||
if elasticsearch == "" {
|
||||
log.Error("elasticsearch config was not found in metadata processor")
|
||||
return
|
||||
}
|
||||
|
||||
meta := elastic.GetMetadata(util.ToString(elasticsearch))
|
||||
if meta == nil {
|
||||
log.Debugf("metadata for [%v] is nil", elasticsearch)
|
||||
return
|
||||
}
|
||||
|
||||
host := meta.GetActiveHost()
|
||||
log.Debugf("random choose node [%v] to consume queue [%v]", host, v.Id)
|
||||
processor.wg.Add(1)
|
||||
|
||||
//go processor.NewBulkWorker("bulk_indexing_"+host,c, processor.bulkSizeInByte, v, host)
|
||||
go processor.HandleMessage(c, v)
|
||||
|
||||
}
|
||||
|
||||
func (processor *MetadataProcessor) HandleMessage(ctx *pipeline.Context, qConfig *queue.QueueConfig) {
|
||||
defer func() {
|
||||
if !global.Env().IsDebug {
|
||||
if r := recover(); r != nil {
|
||||
var v string
|
||||
switch r.(type) {
|
||||
case error:
|
||||
v = r.(error).Error()
|
||||
case runtime.Error:
|
||||
v = r.(runtime.Error).Error()
|
||||
case string:
|
||||
v = r.(string)
|
||||
}
|
||||
log.Errorf("error in %s processor: %v", processor.Name(), v)
|
||||
}
|
||||
}
|
||||
processor.wg.Done()
|
||||
log.Tracef("exit %s processor", processor.Name())
|
||||
}()
|
||||
|
||||
key := qConfig.Id
|
||||
|
||||
if processor.config.MaxWorkers > 0 && util.MapLength(&processor.inFlightQueueConfigs) > processor.config.MaxWorkers {
|
||||
log.Debugf("reached max num of workers, skip init [%v]", qConfig.Name)
|
||||
return
|
||||
}
|
||||
|
||||
var workerID = util.GetUUID()
|
||||
_, exists := processor.inFlightQueueConfigs.Load(key)
|
||||
if exists {
|
||||
log.Errorf("queue [%v] has more then one consumer", qConfig.Id)
|
||||
return
|
||||
}
|
||||
|
||||
processor.inFlightQueueConfigs.Store(key, workerID)
|
||||
log.Debugf("starting worker:[%v], queue:[%v]", workerID, qConfig.Name)
|
||||
var consumer = queue.GetOrInitConsumerConfig(qConfig.Id, processor.config.Consumer.Group, processor.config.Consumer.Name)
|
||||
initOffset, _ := queue.GetOffset(qConfig, consumer)
|
||||
offset := initOffset
|
||||
defer func() {
|
||||
log.Debugf("worker:[%v] start consume queue:[%v] offset:%v", workerID, qConfig.Id, offset)
|
||||
}()
|
||||
|
||||
for {
|
||||
if ctx.IsCanceled() {
|
||||
return
|
||||
}
|
||||
ctx1, messages, isTimeout, err := queue.Consume(qConfig, consumer, offset)
|
||||
//if timeout{
|
||||
// log.Tracef("timeout on queue:[%v]",qConfig.Name)
|
||||
// ctx.Failed()
|
||||
// return
|
||||
//}
|
||||
|
||||
if err != nil {
|
||||
log.Tracef("error on queue:[%v]", qConfig.Name)
|
||||
if err.Error() == "EOF" {
|
||||
if len(messages) > 0 {
|
||||
goto HANDLE_MESSAGE
|
||||
}
|
||||
return
|
||||
}
|
||||
//panic(err)
|
||||
if isTimeout {
|
||||
time.Sleep(time.Millisecond * 1000)
|
||||
}
|
||||
}
|
||||
|
||||
HANDLE_MESSAGE:
|
||||
|
||||
//update temp offset, not committed, continued reading
|
||||
offset = ctx1.NextOffset.String()//TODO
|
||||
|
||||
if len(messages) > 0 {
|
||||
for _, pop := range messages {
|
||||
|
||||
typ, err := jsonparser.GetString(pop.Data, "metadata", "name")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
switch typ {
|
||||
case "index_health_change":
|
||||
//err = processor.HandleIndexHealthChange(&ev)
|
||||
case "index_state_change":
|
||||
indexState, _, _, err := jsonparser.Get(pop.Data, "payload", "index_state")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = processor.HandleIndexStateChange(indexState)
|
||||
case "unknown_node_status":
|
||||
processor.HandleUnknownNodeStatus(pop.Data)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
if offset != "" && initOffset != offset {
|
||||
ok, err := queue.CommitOffset(qConfig, consumer, offset)
|
||||
if !ok || err != nil {
|
||||
panic(err)
|
||||
}
|
||||
initOffset=offset
|
||||
}
|
||||
} else {
|
||||
if !isTimeout {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
func (processor *MetadataProcessor) HandleIndexStateChange(indexState []byte) error {
|
||||
esClient := elastic.GetClient(processor.config.Elasticsearch)
|
||||
// save index metadata
|
||||
id, err := jsonparser.GetString(indexState, "id")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
storeIndexName := orm.GetIndexName(elastic.IndexConfig{})
|
||||
|
||||
_, err = esClient.Index(storeIndexName, "", id, indexState, "")
|
||||
return err
|
||||
}
|
||||
|
||||
func (processor *MetadataProcessor) HandleUnknownNodeStatus(ev []byte) error {
|
||||
clusterID, err := jsonparser.GetString(ev, "payload", "cluster_id")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
esClient := elastic.GetClient(processor.config.Elasticsearch)
|
||||
queryDslTpl := `{"script": {
|
||||
"source": "ctx._source.metadata.labels.status='unavailable'",
|
||||
"lang": "painless"
|
||||
},
|
||||
"query": {
|
||||
"bool": {
|
||||
"must": [
|
||||
{"term": {
|
||||
"metadata.cluster_id": {
|
||||
"value": "%s"
|
||||
}
|
||||
}},
|
||||
{"term": {
|
||||
"metadata.category": {
|
||||
"value": "elasticsearch"
|
||||
}
|
||||
}}
|
||||
]
|
||||
}
|
||||
}}`
|
||||
queryDsl := fmt.Sprintf(queryDslTpl, clusterID)
|
||||
_, err = esClient.UpdateByQuery(orm.GetIndexName(elastic.NodeConfig{}), []byte(queryDsl))
|
||||
return err
|
||||
}
|
||||
|
||||
func (processor *MetadataProcessor) HandleIndexHealthChange(ev *event.Event) error {
|
||||
// save activity
|
||||
activityInfo := &event.Activity{
|
||||
ID: util.GetUUID(),
|
||||
Timestamp: ev.Timestamp,
|
||||
Metadata: event.ActivityMetadata{
|
||||
Category: ev.Metadata.Category,
|
||||
Group: "metadata",
|
||||
Name: "index_health_change",
|
||||
Type: "update",
|
||||
Labels: ev.Metadata.Labels,
|
||||
},
|
||||
}
|
||||
esClient := elastic.GetClient(processor.config.Elasticsearch)
|
||||
_, err := esClient.Index(orm.GetIndexName(activityInfo), "", activityInfo.ID, activityInfo, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// update index health status
|
||||
queryDslTpl := `{
|
||||
"size": 1,
|
||||
"query": {
|
||||
"bool": {
|
||||
"must": [
|
||||
{"term": {
|
||||
"metadata.index_id": {
|
||||
"value": "%s"
|
||||
}
|
||||
}},
|
||||
{"term": {
|
||||
"metadata.category": {
|
||||
"value": "elasticsearch"
|
||||
}
|
||||
}}
|
||||
],
|
||||
"must_not": [
|
||||
{"term": {
|
||||
"metadata.labels.index_status": {
|
||||
"value": "deleted"
|
||||
}
|
||||
}}
|
||||
]
|
||||
}
|
||||
}
|
||||
}`
|
||||
queryDsl := fmt.Sprintf(queryDslTpl, ev.Metadata.Labels["index_id"])
|
||||
indexName := orm.GetIndexName(elastic.IndexConfig{})
|
||||
searchRes, err := esClient.SearchWithRawQueryDSL(indexName, []byte(queryDsl))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if searchRes.GetTotal() == 0 {
|
||||
return nil
|
||||
}
|
||||
source := util.MapStr(searchRes.Hits.Hits[0].Source)
|
||||
source.Put("metadata.labels.health_status", ev.Metadata.Labels["to"])
|
||||
_, err = esClient.Index(indexName, "", searchRes.Hits.Hits[0].ID, source, "")
|
||||
return err
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,77 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package migration
|
||||
|
||||
type ElasticDataConfig struct {
|
||||
Cluster struct {
|
||||
Source ClusterInfo `json:"source"`
|
||||
Target ClusterInfo `json:"target"`
|
||||
} `json:"cluster"`
|
||||
Indices []IndexConfig `json:"indices"`
|
||||
Settings struct {
|
||||
ParallelIndices int `json:"parallel_indices"`
|
||||
ParallelTaskPerIndex int `json:"parallel_task_per_index"`
|
||||
ScrollSize struct {
|
||||
Docs int `json:"docs"`
|
||||
Timeout string `json:"timeout"`
|
||||
} `json:"scroll_size"`
|
||||
BulkSize struct {
|
||||
Docs int `json:"docs"`
|
||||
StoreSizeInMB int `json:"store_size_in_mb"`
|
||||
} `json:"bulk_size"`
|
||||
Execution ExecutionConfig `json:"execution"`
|
||||
} `json:"settings"`
|
||||
Creator struct {
|
||||
Name string `json:"name"`
|
||||
Id string `json:"id"`
|
||||
} `json:"creator"`
|
||||
}
|
||||
|
||||
type ExecutionConfig struct {
|
||||
TimeWindow []TimeWindowItem `json:"time_window"`
|
||||
Nodes struct{
|
||||
Permit []ExecutionNode `json:"permit"`
|
||||
} `json:"nodes"`
|
||||
}
|
||||
|
||||
type ExecutionNode struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type TimeWindowItem struct {
|
||||
Start string `json:"start"`
|
||||
End string `json:"end"`
|
||||
}
|
||||
|
||||
type IndexConfig struct {
|
||||
Source IndexInfo `json:"source"`
|
||||
Target IndexInfo `json:"target"`
|
||||
RawFilter interface{} `json:"raw_filter"`
|
||||
IndexRename map[string]interface{} `json:"index_rename"`
|
||||
TypeRename map[string]interface{} `json:"type_rename"`
|
||||
Partition *IndexPartition `json:"partition,omitempty"`
|
||||
TaskID string `json:"task_id,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
Percent float64 `json:"percent,omitempty"`
|
||||
ErrorPartitions int `json:"error_partitions,omitempty"`
|
||||
}
|
||||
type IndexPartition struct {
|
||||
FieldType string `json:"field_type"`
|
||||
FieldName string `json:"field_name"`
|
||||
Step interface{} `json:"step"`
|
||||
}
|
||||
|
||||
type IndexInfo struct {
|
||||
Name string `json:"name"`
|
||||
DocType string `json:"doc_type"`
|
||||
Docs int64 `json:"docs"`
|
||||
StoreSizeInBytes int `json:"store_size_in_bytes"`
|
||||
}
|
||||
|
||||
type ClusterInfo struct {
|
||||
Id string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package migration
|
||||
|
||||
import (
|
||||
"infini.sh/framework/core/module"
|
||||
)
|
||||
|
||||
func (module *Module) Name() string {
|
||||
return "migration"
|
||||
}
|
||||
|
||||
func (module *Module) Setup() {
|
||||
InitAPI()
|
||||
}
|
||||
func (module *Module) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (module *Module) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type Module struct {
|
||||
}
|
||||
|
||||
func init() {
|
||||
module.RegisterUserPlugin(&Module{})
|
||||
}
|
|
@ -0,0 +1,507 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package migration
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/framework/core/config"
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/env"
|
||||
"infini.sh/framework/core/global"
|
||||
"infini.sh/framework/core/pipeline"
|
||||
task2 "infini.sh/framework/core/task"
|
||||
"infini.sh/framework/core/util"
|
||||
"infini.sh/framework/modules/elastic/common"
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ClusterMigrationProcessor struct {
|
||||
id string
|
||||
config *ClusterMigrationConfig
|
||||
}
|
||||
|
||||
type ClusterMigrationConfig struct {
|
||||
Elasticsearch string `config:"elasticsearch,omitempty"`
|
||||
IndexName string `config:"index_name"`
|
||||
DetectIntervalInMs int `config:"detect_interval_in_ms"`
|
||||
LogIndexName string `config:"log_index_name"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
pipeline.RegisterProcessorPlugin("cluster_migration", newClusterMigrationProcessor)
|
||||
}
|
||||
|
||||
func newClusterMigrationProcessor(c *config.Config) (pipeline.Processor, error) {
|
||||
|
||||
cfg := ClusterMigrationConfig{
|
||||
DetectIntervalInMs: 5000,
|
||||
}
|
||||
if err := c.Unpack(&cfg); err != nil {
|
||||
log.Error(err)
|
||||
return nil, fmt.Errorf("failed to unpack the configuration of cluster migration processor: %s", err)
|
||||
}
|
||||
if cfg.IndexName == "" || cfg.LogIndexName == "" {
|
||||
ormConfig := common.ORMConfig{}
|
||||
ok, err := env.ParseConfig("elastic.orm", &ormConfig)
|
||||
if ok && err == nil {
|
||||
if cfg.IndexName == ""{
|
||||
cfg.IndexName = fmt.Sprintf("%stask", ormConfig.IndexPrefix)
|
||||
}
|
||||
if cfg.LogIndexName == "" {
|
||||
cfg.LogIndexName = fmt.Sprintf("%stask-log", ormConfig.IndexPrefix)
|
||||
}
|
||||
}else{
|
||||
err = fmt.Errorf("parse config elastic.orm error: %w", err)
|
||||
log.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
processor := ClusterMigrationProcessor{
|
||||
id: util.GetUUID(),
|
||||
config: &cfg,
|
||||
}
|
||||
|
||||
return &processor, nil
|
||||
}
|
||||
|
||||
func (p *ClusterMigrationProcessor) Name() string {
|
||||
return "cluster_migration"
|
||||
}
|
||||
|
||||
func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error {
|
||||
defer func() {
|
||||
if !global.Env().IsDebug {
|
||||
if r := recover(); r != nil {
|
||||
var v string
|
||||
switch r.(type) {
|
||||
case error:
|
||||
v = r.(error).Error()
|
||||
case runtime.Error:
|
||||
v = r.(runtime.Error).Error()
|
||||
case string:
|
||||
v = r.(string)
|
||||
}
|
||||
log.Errorf("error in %s processor: %v", p.Name(), v)
|
||||
}
|
||||
}
|
||||
log.Tracef("exit %s processor", p.Name())
|
||||
}()
|
||||
|
||||
for {
|
||||
if ctx.IsCanceled() {
|
||||
return nil
|
||||
}
|
||||
tasks, err := p.getClusterMigrationTasks(20)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if len(tasks) == 0 {
|
||||
log.Debug("got zero cluster migration task from es")
|
||||
if p.config.DetectIntervalInMs > 0 {
|
||||
time.Sleep(time.Millisecond * time.Duration(p.config.DetectIntervalInMs))
|
||||
}
|
||||
}
|
||||
for _, t := range tasks {
|
||||
if ctx.IsCanceled() {
|
||||
return nil
|
||||
}
|
||||
t.Status = task2.StatusRunning
|
||||
t.StartTimeInMillis = time.Now().UnixMilli()
|
||||
p.writeTaskLog(&t, &task2.Log{
|
||||
ID: util.GetUUID(),
|
||||
TaskId: t.ID,
|
||||
Status: task2.StatusRunning,
|
||||
Type: t.Metadata.Type,
|
||||
Action: task2.LogAction{
|
||||
Parameters: t.Parameters,
|
||||
},
|
||||
Content: fmt.Sprintf("starting to execute task [%s]", t.ID),
|
||||
Timestamp: time.Now().UTC(),
|
||||
})
|
||||
err = p.SplitMigrationTask(&t)
|
||||
taskLog := &task2.Log{
|
||||
ID: util.GetUUID(),
|
||||
TaskId: t.ID,
|
||||
Status: task2.StatusRunning,
|
||||
Type: t.Metadata.Type,
|
||||
Action: task2.LogAction{
|
||||
Parameters: t.Parameters,
|
||||
Result: &task2.LogResult{
|
||||
Success: true,
|
||||
},
|
||||
},
|
||||
Content: fmt.Sprintf("success to split task [%s]", t.ID),
|
||||
Timestamp: time.Now().UTC(),
|
||||
}
|
||||
if err != nil {
|
||||
taskLog.Status = task2.StatusError
|
||||
taskLog.Content = fmt.Sprintf("failed to split task [%s]: %v", t.ID, err)
|
||||
taskLog.Action.Result = &task2.LogResult{
|
||||
Success: false,
|
||||
Error: err.Error(),
|
||||
}
|
||||
}
|
||||
t.Status = taskLog.Status
|
||||
p.writeTaskLog(&t, taskLog)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
//es index refresh
|
||||
time.Sleep(time.Millisecond * 1200)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) error {
|
||||
if taskItem.Metadata.Labels == nil {
|
||||
return fmt.Errorf("empty metadata labels, unexpected cluster migration task: %s", util.MustToJSON(taskItem))
|
||||
}
|
||||
if taskItem.Metadata.Labels["pipeline_id"] != p.Name() {
|
||||
log.Tracef("got unexpect task type of %s with task id [%s] in cluster migration processor", taskItem.Metadata.Type, taskItem.ID)
|
||||
return nil
|
||||
}
|
||||
parameters := util.MapStr(taskItem.Parameters)
|
||||
migrationConfig, err := parameters.GetValue("pipeline.config")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
buf := util.MustToJSONBytes(migrationConfig)
|
||||
clusterMigrationTask := ElasticDataConfig{}
|
||||
err = util.FromJSONBytes(buf, &clusterMigrationTask)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
parameters.Put("pipeline.config", clusterMigrationTask)
|
||||
}()
|
||||
esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id)
|
||||
esTargetClient := elastic.GetClient(clusterMigrationTask.Cluster.Target.Id)
|
||||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||
|
||||
for i, index := range clusterMigrationTask.Indices {
|
||||
source := util.MapStr{
|
||||
"cluster_id": clusterMigrationTask.Cluster.Source.Id,
|
||||
"indices": index.Source.Name,
|
||||
//"slice_size": 10,
|
||||
"batch_size": clusterMigrationTask.Settings.ScrollSize.Docs,
|
||||
"scroll_time": clusterMigrationTask.Settings.ScrollSize.Timeout,
|
||||
}
|
||||
if index.IndexRename != nil {
|
||||
source["index_rename"] = index.IndexRename
|
||||
}
|
||||
if index.Target.Name != "" {
|
||||
source["index_rename"] = util.MapStr{
|
||||
index.Source.Name: index.Target.Name,
|
||||
}
|
||||
}
|
||||
if index.TypeRename != nil {
|
||||
source["type_rename"] = index.TypeRename
|
||||
}
|
||||
|
||||
if v, ok := index.RawFilter.(string); ok {
|
||||
source["query_string"] = v
|
||||
}else{
|
||||
source["query_dsl"] = index.RawFilter
|
||||
if index.Source.DocType != "" {
|
||||
if index.Target.DocType != "" {
|
||||
source["type_rename"] = util.MapStr{
|
||||
index.Source.DocType: index.Target.DocType,
|
||||
}
|
||||
}
|
||||
must := []interface{}{
|
||||
util.MapStr{
|
||||
"terms": util.MapStr{
|
||||
"_type": []string{index.Source.DocType},
|
||||
},
|
||||
},
|
||||
}
|
||||
if index.RawFilter != nil {
|
||||
must = append(must, index.RawFilter)
|
||||
}
|
||||
source["query_dsl"] = util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": must,
|
||||
},
|
||||
}
|
||||
}else{
|
||||
if esSourceClient.GetMajorVersion() >= 8 {
|
||||
source["type_rename"] = util.MapStr{
|
||||
"*": index.Target.DocType,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
var targetMust []interface{}
|
||||
if index.RawFilter != nil {
|
||||
targetMust = append(targetMust, index.RawFilter)
|
||||
}
|
||||
if index.Target.DocType != "" && esTargetClient.GetMajorVersion() < 8 {
|
||||
targetMust = append(targetMust, util.MapStr{
|
||||
"terms": util.MapStr{
|
||||
"_type": []string{index.Target.DocType},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
target := util.MapStr{
|
||||
"cluster_id": clusterMigrationTask.Cluster.Target.Id,
|
||||
//"max_worker_size": 10,
|
||||
//"detect_interval": 100,
|
||||
"bulk": util.MapStr{
|
||||
"batch_size_in_mb": clusterMigrationTask.Settings.BulkSize.StoreSizeInMB,
|
||||
"batch_size_in_docs": clusterMigrationTask.Settings.BulkSize.Docs,
|
||||
},
|
||||
}
|
||||
indexParameters := map[string]interface{}{
|
||||
"pipeline": util.MapStr{
|
||||
"id": "index_migration",
|
||||
"config": util.MapStr{
|
||||
"source": source,
|
||||
"target": target,
|
||||
"execution": clusterMigrationTask.Settings.Execution,
|
||||
},
|
||||
},
|
||||
}
|
||||
indexMigrationTask := task2.Task{
|
||||
ParentId: []string{taskItem.ID},
|
||||
Cancellable: true,
|
||||
Runnable: false,
|
||||
Status: task2.StatusRunning,
|
||||
StartTimeInMillis: time.Now().UnixMilli(),
|
||||
Metadata: task2.Metadata{
|
||||
Type: "pipeline",
|
||||
Labels: util.MapStr{
|
||||
"pipeline_id": "index_migration",
|
||||
"source_cluster_id": clusterMigrationTask.Cluster.Source.Id,
|
||||
"target_cluster_id": clusterMigrationTask.Cluster.Target.Id,
|
||||
"level": "index",
|
||||
"partition_count": 1,
|
||||
},
|
||||
},
|
||||
Parameters: indexParameters,
|
||||
}
|
||||
|
||||
indexMigrationTask.ID=util.GetUUID()
|
||||
|
||||
clusterMigrationTask.Indices[i].TaskID = indexMigrationTask.ID
|
||||
if index.Partition != nil {
|
||||
partitionQ := &elastic.PartitionQuery{
|
||||
IndexName: index.Source.Name,
|
||||
FieldName: index.Partition.FieldName,
|
||||
FieldType: index.Partition.FieldType,
|
||||
Step: index.Partition.Step,
|
||||
//Filter: index.RawFilter,
|
||||
Filter: source["query_dsl"],
|
||||
}
|
||||
partitions, err := elastic.GetPartitions(partitionQ, esSourceClient)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if partitions == nil || len(partitions) == 0{
|
||||
return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter))
|
||||
}
|
||||
var (
|
||||
partitionID int
|
||||
)
|
||||
for _, partition := range partitions {
|
||||
//skip empty partition
|
||||
if partition.Docs <= 0 {
|
||||
continue
|
||||
}
|
||||
partitionID++
|
||||
partitionSource := util.MapStr{
|
||||
"start": partition.Start,
|
||||
"end": partition.End,
|
||||
"doc_count": partition.Docs,
|
||||
"step": index.Partition.Step,
|
||||
"partition_id": partitionID,
|
||||
}
|
||||
for k, v := range source{
|
||||
if k == "query_string"{
|
||||
continue
|
||||
}
|
||||
partitionSource[k] = v
|
||||
}
|
||||
partitionSource["query_dsl"] = partition.Filter
|
||||
var must []interface{}
|
||||
|
||||
if partition.Other {
|
||||
must = append(must, partition.Filter)
|
||||
}else{
|
||||
must = append(must, util.MapStr{
|
||||
"range": util.MapStr{
|
||||
index.Partition.FieldName: util.MapStr{
|
||||
"gte": partition.Start,
|
||||
"lt": partition.End,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
if targetMust != nil {
|
||||
must = append(must, targetMust...)
|
||||
}
|
||||
if len(must) > 0 {
|
||||
target["query_dsl"] = util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": must,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
partitionMigrationTask := task2.Task{
|
||||
ParentId: []string{taskItem.ID, indexMigrationTask.ID},
|
||||
Cancellable: false,
|
||||
Runnable: true,
|
||||
Status: task2.StatusReady,
|
||||
Metadata: task2.Metadata{
|
||||
Type: "pipeline",
|
||||
Labels: util.MapStr{
|
||||
"pipeline_id": "index_migration",
|
||||
"source_cluster_id": clusterMigrationTask.Cluster.Source.Id,
|
||||
"target_cluster_id": clusterMigrationTask.Cluster.Target.Id,
|
||||
"level": "partition",
|
||||
"index_name": index.Source.Name,
|
||||
"execution": util.MapStr{
|
||||
"nodes": util.MapStr{
|
||||
"permit": clusterMigrationTask.Settings.Execution.Nodes.Permit,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Parameters: map[string]interface{}{
|
||||
"pipeline": util.MapStr{
|
||||
"id": "index_migration",
|
||||
"config": util.MapStr{
|
||||
"source": partitionSource,
|
||||
"target": target,
|
||||
"execution": clusterMigrationTask.Settings.Execution,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
partitionMigrationTask.ID=util.GetUUID()
|
||||
|
||||
_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "")
|
||||
delete(target, "query_dsl")
|
||||
if err != nil {
|
||||
return fmt.Errorf("store index migration task(partition) error: %w", err)
|
||||
}
|
||||
|
||||
}
|
||||
indexMigrationTask.Metadata.Labels["partition_count"] = partitionID
|
||||
}else{
|
||||
source["doc_count"] = index.Source.Docs
|
||||
if targetMust != nil {
|
||||
target["query_dsl"] = util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": targetMust,
|
||||
},
|
||||
}
|
||||
}
|
||||
partitionMigrationTask := task2.Task{
|
||||
ParentId: []string{taskItem.ID, indexMigrationTask.ID},
|
||||
Cancellable: false,
|
||||
Runnable: true,
|
||||
Status: task2.StatusReady,
|
||||
Metadata: task2.Metadata{
|
||||
Type: "pipeline",
|
||||
Labels: util.MapStr{
|
||||
"pipeline_id": "index_migration",
|
||||
"source_cluster_id": clusterMigrationTask.Cluster.Source.Id,
|
||||
"target_cluster_id": clusterMigrationTask.Cluster.Target.Id,
|
||||
"level": "partition",
|
||||
"index_name": index.Source.Name,
|
||||
"execution": util.MapStr{
|
||||
"nodes": util.MapStr{
|
||||
"permit": clusterMigrationTask.Settings.Execution.Nodes.Permit,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Parameters: indexParameters,
|
||||
}
|
||||
partitionMigrationTask.ID=util.GetUUID()
|
||||
|
||||
_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "")
|
||||
delete(target, "query_dsl")
|
||||
if err != nil {
|
||||
return fmt.Errorf("store index migration task(partition) error: %w", err)
|
||||
}
|
||||
}
|
||||
_, err = esClient.Index(p.config.IndexName, "", indexMigrationTask.ID, indexMigrationTask, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("store index migration task error: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ClusterMigrationProcessor) getClusterMigrationTasks(size int)([]task2.Task, error){
|
||||
queryDsl := util.MapStr{
|
||||
"size": size,
|
||||
"sort": []util.MapStr{
|
||||
{
|
||||
"created": util.MapStr{
|
||||
"order": "asc",
|
||||
},
|
||||
},
|
||||
},
|
||||
"query": util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": []util.MapStr{
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"status": task2.StatusReady,
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.labels.pipeline_id": p.Name(),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(queryDsl))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res.GetTotal() == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var migrationTasks []task2.Task
|
||||
for _, hit := range res.Hits.Hits {
|
||||
buf, err := util.ToJSONBytes(hit.Source)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tk := task2.Task{}
|
||||
err = util.FromJSONBytes(buf, &tk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
migrationTasks = append(migrationTasks, tk)
|
||||
}
|
||||
return migrationTasks, nil
|
||||
}
|
||||
|
||||
func (p *ClusterMigrationProcessor) writeTaskLog(taskItem *task2.Task, logItem *task2.Log) {
|
||||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||
_, err := esClient.Index(p.config.IndexName,"", logItem.TaskId, taskItem, "" )
|
||||
if err != nil{
|
||||
log.Error(err)
|
||||
}
|
||||
_, err = esClient.Index(p.config.LogIndexName,"", logItem.ID, logItem, "" )
|
||||
if err != nil{
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue