migration_v2 (#38)

bulk write after docs was scrolled complete

update default config with migration dispatcher

add bulk parameter idle_timeout_in_seconds, slice_size

clear queue before creating pipeline

check instance available when task state is running

add check_instance_available config for migration dispatcher

get migration task progress info by index_name_unique(index_name+doctype)

get instance list only by gateway

get migration task progress info from es and instance pipeline context

rewrite logic of handling running migration major task

calc complete time in api getDataMigrationTaskOfIndex

init

Co-authored-by: liugq <silenceqi@hotmail.com>
This commit is contained in:
sunjiacheng 2023-03-28 17:54:35 +08:00 committed by medcl
parent cd56a4ff24
commit 69cf1d67a9
8 changed files with 1797 additions and 737 deletions

View File

@ -44,6 +44,7 @@ pipeline:
fetch_max_messages: 100
queues:
type: indexing_merge
tag: "metrics"
when:
cluster_available: ["$[[CLUSTER_ID]]"]
- name: metadata_ingest
@ -78,11 +79,46 @@ pipeline:
group: activity
when:
cluster_available: ["$[[CLUSTER_ID]]"]
- name: cluster_migration_split
- name: migration_task_dispatcher
auto_start: true
keep_running: true
processor:
- cluster_migration:
- migration_dispatcher:
elasticsearch: "$[[CLUSTER_ID]]"
check_instance_available: true
max_tasks_per_instance: 10
task_batch_size: 50
when:
cluster_available: ["$[[CLUSTER_ID]]"]
- name: logging_indexing_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
input_queue: "logging"
idle_timeout_in_seconds: 1
elasticsearch: "$[[CLUSTER_ID]]"
index_name: "$[[INDEX_PREFIX]]logs"
output_queue:
name: "pipeline-logs"
label:
tag: "request_logging"
worker_size: 1
bulk_size_in_kb: 1
- name: consume-logging_requests
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk:
compress: true
batch_size_in_mb: 1
batch_size_in_docs: 1
consumer:
fetch_max_messages: 100
queues:
type: indexing_merge
tag: "request_logging"
when:
cluster_available: ["$[[CLUSTER_ID]]"]

View File

@ -5,8 +5,14 @@
package model
import (
"context"
"fmt"
"infini.sh/framework/core/agent"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"infini.sh/framework/modules/pipeline"
"net/http"
"time"
)
@ -22,3 +28,101 @@ type Instance struct {
Tags [] string `json:"tags,omitempty"`
Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"`
}
func (inst *Instance) CreatePipeline(body []byte) error {
req := &util.Request{
Method: http.MethodPost,
Body: body,
Url: inst.Endpoint + "/pipeline/tasks/",
}
return inst.doRequest(req, nil)
}
func (inst *Instance) StopPipeline(ctx context.Context, pipelineID string) error {
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/pipeline/task/%s/_stop", inst.Endpoint, pipelineID),
Context: ctx,
}
return inst.doRequest(req, nil)
}
func (inst *Instance) StopPipelineWithTimeout(pipelineID string, duration time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()
return inst.StopPipeline(ctx, pipelineID)
}
func (inst *Instance) StartPipeline(pipelineID string) error {
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/pipeline/task/%s/_start", inst.Endpoint, pipelineID),
}
return inst.doRequest(req, nil)
}
func (inst *Instance) DeletePipeline(pipelineID string) error {
req := &util.Request{
Method: http.MethodDelete,
Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID),
}
return inst.doRequest(req, nil)
}
func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipelinesResponse, error) {
body := util.MustToJSONBytes(util.MapStr{
"ids": pipelineIDs,
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/pipeline/tasks/_search", inst.Endpoint),
Body: body,
Context: ctx,
}
res := pipeline.GetPipelinesResponse{}
err := inst.doRequest(req, &res)
return res, err
}
func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error {
req := &util.Request{
Method: http.MethodDelete,
Url: fmt.Sprintf("%s/queue/_search", inst.Endpoint),
Body: util.MustToJSONBytes(util.MapStr{
"selector": selector,
}),
}
return inst.doRequest(req, nil)
}
func (inst *Instance) TryConnect(ctx context.Context) error {
req := &util.Request{
Method: http.MethodGet,
Url: fmt.Sprintf("%s/_framework/api/_info", inst.Endpoint),
Context: ctx,
}
return inst.doRequest(req, nil)
}
func (inst *Instance) TryConnectWithTimeout(duration time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()
return inst.TryConnect(ctx)
}
func (inst *Instance) doRequest(req *util.Request, resBody interface{}) error {
req.SetBasicAuth(inst.BasicAuth.Username, inst.BasicAuth.Password)
result, err := util.ExecuteRequest(req)
if err != nil {
return err
}
if result.StatusCode != http.StatusOK {
return fmt.Errorf(string(result.Body))
}
if resBody != nil {
return util.FromJSONBytes(result.Body, resBody)
}
return nil
}

View File

@ -5,7 +5,6 @@
package gateway
import (
"context"
"fmt"
log "github.com/cihub/seelog"
"github.com/segmentio/encoding/json"
@ -368,60 +367,7 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request,
if from < 0 {
from = 0
}
agentIndexName := orm.GetIndexName(agent.Instance{})
gatewayIndexName := orm.GetIndexName(model.Instance{})
agentMust := []util.MapStr{
{
"term": util.MapStr{
"enrolled": util.MapStr{
"value": true,
},
},
},
{
"term": util.MapStr{
"status": util.MapStr{
"value": "online",
},
},
},
{
"term": util.MapStr{
"_index": util.MapStr{
"value": agentIndexName,
},
},
},
}
boolQ := util.MapStr{
"minimum_should_match": 1,
"should": []util.MapStr{
{
"bool": util.MapStr{
"must": agentMust,
},
},
{
"term": util.MapStr{
"_index": util.MapStr{
"value": gatewayIndexName,
},
},
},
},
}
if keyword != "" {
boolQ["must"] = []util.MapStr{
{
"prefix": util.MapStr{
"name": util.MapStr{
"value": keyword,
},
},
},
}
}
query := util.MapStr{
"size": size,
@ -433,12 +379,24 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request,
},
},
},
"query": util.MapStr{
"bool": boolQ,
},
}
if keyword != "" {
query["query"] = util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"prefix": util.MapStr{
"name": util.MapStr{
"value": keyword,
},
},
},
},
},
}
}
q := orm.Query{
IndexName: fmt.Sprintf("%s,%s", gatewayIndexName, agentIndexName),
IndexName: gatewayIndexName,
RawQuery: util.MustToJSONBytes(query),
}
err, result := orm.Search(nil, &q)
@ -457,63 +415,33 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request,
var nodes = []util.MapStr{}
for _, hit := range searchRes.Hits.Hits {
var (
endpoint string
ok bool
)
buf := util.MustToJSONBytes(hit.Source)
inst := model.Instance{}
err = util.FromJSONBytes(buf, &inst)
if err != nil {
log.Error(err)
continue
}
node := util.MapStr{
"id": hit.Source["id"],
"name": hit.Source["name"],
"id": inst.ID,
"name": inst.Name,
"available": false,
"type": "gateway",
}
hasErr := false
if hit.Index == gatewayIndexName {
node["type"] = "gateway"
if endpoint, ok = hit.Source["endpoint"].(string); !ok {
log.Warnf("got unexpect endpoint type of gateway instance [%s]: %s", hit.ID, hit.Source["endpoint"])
hasErr = true
}
}else if hit.Index == agentIndexName {
node["type"] = "agent"
endpoint = fmt.Sprintf("%s://%s:%v", hit.Source["schema"], hit.Source["remote_ip"], hit.Source["port"])
ul, err := url.Parse(inst.Endpoint)
if err != nil {
log.Error(err)
continue
}
ul, err := url.Parse(endpoint)
node["host"] = ul.Host
err = inst.TryConnectWithTimeout(time.Second)
if err != nil {
log.Error(err)
}else{
node["host"] = ul.Host
node["available"] = true
}
if !hasErr {
available, err := isNodeAvailable(endpoint) //TODO remove
if err != nil {
log.Error(err)
}
node["available"] = available
}
nodes = append(nodes, node)
}
h.WriteJSON(w, nodes, http.StatusOK)
}
func isNodeAvailable(endpoint string) (bool, error){
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
rq := &util.Request{
Method: http.MethodGet,
Url: fmt.Sprintf("%s%s", endpoint, "/pipeline/tasks/_dynamic"),
Context: ctx,
}
resp, err := util.ExecuteRequest(rq)
if err != nil {
return false, err
}
resBody := struct {
Success bool `json:"success"`
}{}
err = util.FromJSONBytes(resp.Body, &resBody)
if err != nil {
return false, err
}
return resBody.Success, nil
}

View File

@ -263,7 +263,7 @@ func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig
if timeout {
log.Tracef("timeout on queue:[%v]", qConfig.Name)
ctx.Failed()
ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name))
return
}

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,8 @@
package migration
import "fmt"
type ElasticDataConfig struct {
Cluster struct {
Source ClusterInfo `json:"source"`
@ -21,6 +23,9 @@ type ElasticDataConfig struct {
Bulk struct {
Docs int `json:"docs"`
StoreSizeInMB int `json:"store_size_in_mb"`
MaxWorkerSize int `json:"max_worker_size"`
IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"`
SliceSize int `json:"slice_size"`
} `json:"bulk"`
Execution ExecutionConfig `json:"execution"`
} `json:"settings"`
@ -54,8 +59,8 @@ type IndexConfig struct {
IndexRename map[string]interface{} `json:"index_rename"`
TypeRename map[string]interface{} `json:"type_rename"`
Partition *IndexPartition `json:"partition,omitempty"`
TaskID string `json:"task_id,omitempty"`
Status string `json:"status,omitempty"`
//TaskID string `json:"task_id,omitempty"`
//Status string `json:"status,omitempty"`
Percent float64 `json:"percent,omitempty"`
ErrorPartitions int `json:"error_partitions,omitempty"`
}
@ -72,7 +77,33 @@ type IndexInfo struct {
StoreSizeInBytes int `json:"store_size_in_bytes"`
}
func (ii *IndexInfo) GetUniqueIndexName() string{
return fmt.Sprintf("%s:%s", ii.Name, ii.DocType)
}
type ClusterInfo struct {
Id string `json:"id"`
Name string `json:"name"`
}
type TaskCompleteState struct {
IsComplete bool
Error string
ClearPipeline bool
PipelineIds []string
SuccessDocs float64
ScrolledDocs float64
RunningPhase int
TotalDocs interface{}
}
type MajorTaskState struct{
ScrolledDocs float64
IndexDocs float64
Status string
}
type IndexStateInfo struct {
ErrorPartitions int
IndexDocs float64
}

View File

@ -15,12 +15,11 @@ func (module *Module) Name() string {
}
func (module *Module) Setup() {
module.BulkResultIndexName = ".infini_async_bulk_results"
exists, err := env.ParseConfig("migration", module)
if exists && err != nil {
log.Error(err)
}
InitAPI(module.BulkResultIndexName)
InitAPI()
}
func (module *Module) Start() error {
return nil
@ -31,7 +30,6 @@ func (module *Module) Stop() error {
}
type Module struct {
BulkResultIndexName string `config:"bulk_result_index_name"`
}
func init() {

File diff suppressed because it is too large Load Diff