feat: migration with bulk skip exit docs
This commit is contained in:
parent
dc562c0d74
commit
50f12de6a8
|
@ -129,11 +129,12 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error {
|
||||||
Processor: []util.MapStr{
|
Processor: []util.MapStr{
|
||||||
{
|
{
|
||||||
"es_scroll": util.MapStr{
|
"es_scroll": util.MapStr{
|
||||||
"remove_type": docType == "",
|
"remove_type": docType == "",
|
||||||
"slice_size": cfg.Source.SliceSize,
|
"slice_size": cfg.Source.SliceSize,
|
||||||
"batch_size": cfg.Source.BatchSize,
|
"batch_size": cfg.Source.BatchSize,
|
||||||
"indices": indexName,
|
"indices": indexName,
|
||||||
"elasticsearch": sourceClusterID,
|
"elasticsearch": sourceClusterID,
|
||||||
|
"skip_exist_documents": cfg.Target.Bulk.SkipExistDocuments,
|
||||||
"queue": util.MapStr{
|
"queue": util.MapStr{
|
||||||
"name": scrollID,
|
"name": scrollID,
|
||||||
"labels": util.MapStr{
|
"labels": util.MapStr{
|
||||||
|
@ -186,10 +187,11 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error {
|
||||||
"bulk_indexing": util.MapStr{
|
"bulk_indexing": util.MapStr{
|
||||||
"detect_active_queue": false,
|
"detect_active_queue": false,
|
||||||
"bulk": util.MapStr{
|
"bulk": util.MapStr{
|
||||||
"batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB,
|
"batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB,
|
||||||
"batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs,
|
"batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs,
|
||||||
"invalid_queue": "bulk_indexing_400",
|
"invalid_queue": "bulk_indexing_400",
|
||||||
"compress": cfg.Target.Bulk.Compress,
|
"compress": cfg.Target.Bulk.Compress,
|
||||||
|
"skip_exist_documents": cfg.Target.Bulk.SkipExistDocuments,
|
||||||
},
|
},
|
||||||
"max_worker_size": cfg.Target.Bulk.MaxWorkerSize,
|
"max_worker_size": cfg.Target.Bulk.MaxWorkerSize,
|
||||||
"num_of_slices": cfg.Target.Bulk.SliceSize,
|
"num_of_slices": cfg.Target.Bulk.SliceSize,
|
||||||
|
|
|
@ -9,8 +9,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClusterMigrationTaskConfig struct {
|
type ClusterMigrationTaskConfig struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Tags []string `json:"tags"`
|
Tags []string `json:"tags"`
|
||||||
Cluster struct {
|
Cluster struct {
|
||||||
Source ClusterInfo `json:"source"`
|
Source ClusterInfo `json:"source"`
|
||||||
Target ClusterInfo `json:"target"`
|
Target ClusterInfo `json:"target"`
|
||||||
|
@ -42,7 +42,7 @@ type ClusterMigrationIndexConfig struct {
|
||||||
Percent float64 `json:"percent,omitempty"`
|
Percent float64 `json:"percent,omitempty"`
|
||||||
ErrorPartitions int `json:"error_partitions,omitempty"`
|
ErrorPartitions int `json:"error_partitions,omitempty"`
|
||||||
RunningChildren int `json:"running_children,omitempty"`
|
RunningChildren int `json:"running_children,omitempty"`
|
||||||
ExportedPercent float64 `json:"exported_percent,omitempty"`
|
ExportedPercent float64 `json:"exported_percent,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClusterMigrationTaskState struct {
|
type ClusterMigrationTaskState struct {
|
||||||
|
@ -92,6 +92,7 @@ type IndexMigrationBulkConfig struct {
|
||||||
IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"`
|
IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"`
|
||||||
SliceSize int `json:"slice_size"`
|
SliceSize int `json:"slice_size"`
|
||||||
Compress bool `json:"compress"`
|
Compress bool `json:"compress"`
|
||||||
|
SkipExistDocuments bool `json:"skip_exist_documents"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexMigrationTargetConfig struct {
|
type IndexMigrationTargetConfig struct {
|
||||||
|
|
Loading…
Reference in New Issue