diff --git a/console.yml b/console.yml index 13d3c881..9b9a4a52 100644 --- a/console.yml +++ b/console.yml @@ -73,5 +73,23 @@ pipeline: queues: type: metadata category: elasticsearch + consumer: + group: metadata + when: + cluster_available: [ "default" ] + - name: activity_ingest + auto_start: true + keep_running: true + processor: + - activity: + bulk_size_in_mb: 10 + bulk_max_docs_count: 5000 + fetch_max_messages: 1000 + elasticsearch: "default" + queues: + category: elasticsearch + activity: true + consumer: + group: activity when: cluster_available: [ "default" ] \ No newline at end of file diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index 2b273026..8629fc07 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -225,7 +225,7 @@ func (h *GatewayAPI) getInstanceStatus(w http.ResponseWriter, req *http.Request, }) if err != nil { log.Error(err) - //result[gid.(string)] = util.MapStr{} + result[gid.(string)] = util.MapStr{} continue } var resMap = util.MapStr{} diff --git a/plugin/api/index_management/common_command.go b/plugin/api/index_management/common_command.go index d998e01c..c5dd19c9 100644 --- a/plugin/api/index_management/common_command.go +++ b/plugin/api/index_management/common_command.go @@ -45,7 +45,7 @@ func (h *APIHandler) HandleAddCommonCommandAction(w http.ResponseWriter, req *ht h.WriteJSON(w, resBody, http.StatusOK) return } - _, err = esClient.Index(indexName,"", reqParams.ID, reqParams) + _, err = esClient.Index(indexName,"", reqParams.ID, reqParams, "wait_for") if err != nil { log.Error(err) resBody["error"] = err.Error() @@ -90,7 +90,7 @@ func (h *APIHandler) HandleSaveCommonCommandAction(w http.ResponseWriter, req *h h.WriteJSON(w, resBody, http.StatusInternalServerError) return } - _, err = esClient.Index(indexName,"", reqParams.ID, reqParams) + _, err = esClient.Index(indexName,"", reqParams.ID, reqParams, "wait_for") if err != nil { log.Error(err) resBody["error"] = err.Error() diff --git a/plugin/api/index_management/document.go b/plugin/api/index_management/document.go index ba9b7633..1ba2a229 100644 --- a/plugin/api/index_management/document.go +++ b/plugin/api/index_management/document.go @@ -37,7 +37,7 @@ func (handler APIHandler) HandleAddDocumentAction(w http.ResponseWriter, req *ht docID = util.GetUUID() } docType := handler.GetParameter(req, "_type") - insertRes, err := client.Index(indexName, docType, docID, reqBody) + insertRes, err := client.Index(indexName, docType, docID, reqBody, "wait_for") if err != nil { resBody["error"] = err handler.WriteJSON(w, resBody, http.StatusOK) @@ -83,7 +83,7 @@ func (handler APIHandler) HandleUpdateDocumentAction(w http.ResponseWriter, req } } - insertRes, err := client.Index(indexName, typ, docID, reqBody) + insertRes, err := client.Index(indexName, typ, docID, reqBody, "wait_for") if err != nil { resBody["error"] = err.Error() handler.WriteJSON(w, resBody, http.StatusOK) diff --git a/plugin/api/index_management/rebuild.go b/plugin/api/index_management/rebuild.go index a79f428f..25799bb6 100644 --- a/plugin/api/index_management/rebuild.go +++ b/plugin/api/index_management/rebuild.go @@ -78,7 +78,7 @@ func reindex(esName string, body *model.Reindex, typ string) (string, error) { body.Status = model.ReindexStatusRunning body.CreatedAt = time.Now() - _, err = client.Index(orm.GetIndexName(body), typ, body.ID, body) + _, err = client.Index(orm.GetIndexName(body), typ, body.ID, body, "wait_for") if err != nil { return "", err } @@ -166,7 +166,7 @@ func SyncRebuildResult(esName string) error { } source["status"] = status source["task_source"] = doc.Source - _, err := client.Index(orm.GetIndexName(model.Reindex{}), "", esRes.Hits.Hits[idMap[doc.ID]].ID, source) + _, err := client.Index(orm.GetIndexName(model.Reindex{}), "", esRes.Hits.Hits[idMap[doc.ID]].ID, source, "") return err } return nil diff --git a/service/alerting/destination.go b/service/alerting/destination.go index 4b09b441..ca891f91 100644 --- a/service/alerting/destination.go +++ b/service/alerting/destination.go @@ -157,7 +157,7 @@ func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P esClient := elastic.GetClient(esConfig.ID) indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", destId, IfaceMap{ DESTINATION_FIELD: toSaveDest, - }) + }, "wait_for") if err != nil { writeError(w, err) return @@ -205,7 +205,7 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P esClient := elastic.GetClient(config.ID) indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", destinationId, IfaceMap{ DESTINATION_FIELD: toSaveDest, - }) + }, "wait_for") if err != nil { writeError(w, err) return @@ -284,7 +284,7 @@ func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. esClient := elastic.GetClient(config.ID) indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", util.GetUUID(), IfaceMap{ EMAIL_ACCOUNT_FIELD: emailAccount, - }) + },"wait_for") if err != nil { writeError(w, err) return @@ -327,7 +327,7 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. esClient := elastic.GetClient(config.ID) indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}),"", emailAccountId, IfaceMap{ EMAIL_ACCOUNT_FIELD: emailAccount, - }) + }, "wait_for") if err != nil { writeError(w, err) return @@ -469,7 +469,7 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa esClient := elastic.GetClient(config.ID) indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", util.GetUUID(), IfaceMap{ EMAIL_GROUP_FIELD: emailGroup, - }) + },"wait_for") if err != nil { writeError(w, err) return @@ -501,7 +501,7 @@ func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa esClient := elastic.GetClient(config.ID) indexRes, err := esClient.Index( orm.GetIndexName(alerting.Config{}), "", emailGroupId, IfaceMap{ EMAIL_GROUP_FIELD: emailGroup, - }) + }, "wait_for") if err != nil { writeError(w, err) return diff --git a/service/alerting/monitor.go b/service/alerting/monitor.go index c2e89866..1af864d2 100644 --- a/service/alerting/monitor.go +++ b/service/alerting/monitor.go @@ -346,7 +346,7 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param indexRes, err := esClient.Index(indexName,"",util.GetUUID(),IfaceMap{ "cluster_id": id, MONITOR_FIELD: monitor, - }) + },"wait_for") if err != nil { writeError(w, err) return @@ -426,7 +426,7 @@ func DeleteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param } source := util.MapStr(getRes.Source) source.Put("monitor.status", "DELETED") - indexRes, err := esClient.Index(indexName, "", monitorId, source) + indexRes, err := esClient.Index(indexName, "", monitorId, source, "wait_for") if err != nil { writeError(w, err) return @@ -490,7 +490,7 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param indexRes, err := esClient.Index(indexName, "", monitorId, IfaceMap{ "cluster_id": getRes.Source["cluster_id"], MONITOR_FIELD: monitor, - }) + }, "wait_for") if err != nil { writeError(w, err) return diff --git a/service/alerting/schedule.go b/service/alerting/schedule.go index 288112bc..fb862f7c 100644 --- a/service/alerting/schedule.go +++ b/service/alerting/schedule.go @@ -356,7 +356,7 @@ func saveAlertInfo(alertItem *alerting.Alert) error { if alertItem.State == ALERT_COMPLETED { return nil } - _, err = esClient.Index(indexName,"", util.GetUUID(), alertItem) + _, err = esClient.Index(indexName,"", util.GetUUID(), alertItem, "") return err } currentState := queryValue(resBody.Hits.Hits[0].Source, "state", "").(string) @@ -369,13 +369,13 @@ func saveAlertInfo(alertItem *alerting.Alert) error { source["state"] = ALERT_COMPLETED } } - esClient.Index( getAlertIndexName(INDEX_ALERT_HISTORY), "", alertItem.Id, source) + esClient.Index( getAlertIndexName(INDEX_ALERT_HISTORY), "", alertItem.Id, source, "") _,err = esClient.Delete(indexName, "", resBody.Hits.Hits[0].ID) return err } alertItem.StartTime = int64(queryValue(resBody.Hits.Hits[0].Source, "start_time", 0).(float64)) - _, err = esClient.Index(indexName, "", alertItem.Id, alertItem ) + _, err = esClient.Index(indexName, "", alertItem.Id, alertItem, "" ) return err }