feat: migration skip exists docs with bulk operation

This commit is contained in:
hardy 2024-02-04 17:29:12 +08:00 committed by hardy
parent 00dfa9ed5d
commit a338af5c9a
2 changed files with 18 additions and 18 deletions

View File

@ -129,12 +129,12 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error {
Processor: []util.MapStr{ Processor: []util.MapStr{
{ {
"es_scroll": util.MapStr{ "es_scroll": util.MapStr{
"remove_type": docType == "", "remove_type": docType == "",
"slice_size": cfg.Source.SliceSize, "slice_size": cfg.Source.SliceSize,
"batch_size": cfg.Source.BatchSize, "batch_size": cfg.Source.BatchSize,
"indices": indexName, "indices": indexName,
"elasticsearch": sourceClusterID, "elasticsearch": sourceClusterID,
"bulk_create_operation": cfg.Target.Bulk.CreateOperation, "bulk_operation": cfg.Target.Bulk.Operation,
"queue": util.MapStr{ "queue": util.MapStr{
"name": scrollID, "name": scrollID,
"labels": util.MapStr{ "labels": util.MapStr{
@ -187,11 +187,11 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error {
"bulk_indexing": util.MapStr{ "bulk_indexing": util.MapStr{
"detect_active_queue": false, "detect_active_queue": false,
"bulk": util.MapStr{ "bulk": util.MapStr{
"batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB, "batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB,
"batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs, "batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs,
"invalid_queue": "bulk_indexing_400", "invalid_queue": "bulk_indexing_400",
"compress": cfg.Target.Bulk.Compress, "compress": cfg.Target.Bulk.Compress,
"bulk_create_operation": cfg.Target.Bulk.CreateOperation, "bulk_operation": cfg.Target.Bulk.Operation,
}, },
"max_worker_size": cfg.Target.Bulk.MaxWorkerSize, "max_worker_size": cfg.Target.Bulk.MaxWorkerSize,
"num_of_slices": cfg.Target.Bulk.SliceSize, "num_of_slices": cfg.Target.Bulk.SliceSize,

View File

@ -86,13 +86,13 @@ type IndexMigrationSourceConfig struct {
} }
type IndexMigrationBulkConfig struct { type IndexMigrationBulkConfig struct {
BatchSizeInDocs int `json:"batch_size_in_docs"` BatchSizeInDocs int `json:"batch_size_in_docs"`
BatchSizeInMB int `json:"batch_size_in_mb"` BatchSizeInMB int `json:"batch_size_in_mb"`
MaxWorkerSize int `json:"max_worker_size"` MaxWorkerSize int `json:"max_worker_size"`
IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"`
SliceSize int `json:"slice_size"` SliceSize int `json:"slice_size"`
Compress bool `json:"compress"` Compress bool `json:"compress"`
CreateOperation bool `json:"bulk_create_operation"` Operation string `json:"operation"`
} }
type IndexMigrationTargetConfig struct { type IndexMigrationTargetConfig struct {