diff --git a/config/permission.json b/config/permission.json index 87be38f0..3084d925 100644 --- a/config/permission.json +++ b/config/permission.json @@ -281,6 +281,9 @@ {"name": "indices.exists_template", "methods":["get"], "path": "/_template/:name" }, + {"name": "indices.field_usage_stats", "methods":["get"], + "path": "/:index_name/_field_usage_stats" + }, {"name": "doc.*", "methods": ["*"], "path": "/:index_name/:doctype" }, diff --git a/console.yml b/console.yml index 00e876ad..7f24d827 100644 --- a/console.yml +++ b/console.yml @@ -1,4 +1,3 @@ - # for the system cluster, please use Elasticsearch v7.3+ elasticsearch: - name: default @@ -28,28 +27,28 @@ web: enabled: true elastic: - elasticsearch: default + elasticsearch: default + enabled: true + remote_configs: true + health_check: enabled: true - remote_configs: true - health_check: - enabled: true - interval: 30s - availability_check: - enabled: true - interval: 60s - metadata_refresh: - enabled: true - interval: 30s - cluster_settings_check: - enabled: true - interval: 20s - store: - enabled: true - orm: - enabled: true - init_template: true - template_name: ".infini" - index_prefix: ".infini_" + interval: 30s + availability_check: + enabled: true + interval: 60s + metadata_refresh: + enabled: true + interval: 30s + cluster_settings_check: + enabled: true + interval: 20s + store: + enabled: false + orm: + enabled: true + init_template: true + template_name: ".infini" + index_prefix: ".infini_" metrics: enabled: true @@ -90,7 +89,7 @@ pipeline: queues: type: indexing_merge when: - cluster_available: [ "default" ] + cluster_available: ["default"] - name: metadata_ingest auto_start: true keep_running: true @@ -106,7 +105,7 @@ pipeline: consumer: group: metadata when: - cluster_available: [ "default" ] + cluster_available: ["default"] - name: activity_ingest auto_start: true keep_running: true @@ -122,4 +121,4 @@ pipeline: consumer: group: activity when: - cluster_available: [ "default" ] + cluster_available: ["default"] diff --git a/env_check.go b/env_check.go index a771cf54..395c7df1 100644 --- a/env_check.go +++ b/env_check.go @@ -67,7 +67,7 @@ func checkElasticsearchRequire() error{ versionNumber, err := jsonparser.GetString(result.Body, "version", "number") if err != nil { - return fmt.Errorf("check elasticsearch requirement error: %v", err) + return fmt.Errorf("check elasticsearch requirement error: %v, got response: %s", err, string(result.Body)) } cr, err := util.VersionCompare(versionNumber, "7.3") if err !=nil { diff --git a/main.go b/main.go index e901cdff..3145e2bf 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "errors" _ "expvar" + log "github.com/cihub/seelog" "infini.sh/console/config" "infini.sh/console/model/alerting" "infini.sh/console/model/gateway" @@ -15,6 +16,7 @@ import ( _ "infini.sh/framework/core/log" "infini.sh/framework/core/module" "infini.sh/framework/core/orm" + "infini.sh/framework/modules/agent" _ "infini.sh/framework/modules/api" elastic2 "infini.sh/framework/modules/elastic" "infini.sh/framework/modules/filter" @@ -28,7 +30,7 @@ import ( _ "infini.sh/framework/plugins" api2 "infini.sh/gateway/api" _ "infini.sh/gateway/proxy" - log "src/github.com/cihub/seelog" + _ "time/tzdata" ) var appConfig *config.AppConfig @@ -73,6 +75,7 @@ func main() { module.RegisterSystemModule(&ui.UIModule{}) module.RegisterSystemModule(&pipeline.PipeModule{}) module.RegisterSystemModule(&task.TaskModule{}) + module.RegisterSystemModule(&agent.AgentModule{}) module.RegisterUserPlugin(&metrics.MetricsModule{}) diff --git a/model/gateway/instance.go b/model/gateway/instance.go index f8b6aba5..73977adb 100644 --- a/model/gateway/instance.go +++ b/model/gateway/instance.go @@ -5,6 +5,7 @@ package gateway import ( + "infini.sh/framework/core/agent" "infini.sh/framework/core/orm" ) @@ -16,13 +17,8 @@ type Instance struct { Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"` Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"` Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"` - BasicAuth BasicAuth `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"` + BasicAuth agent.BasicAuth `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"` Owner string `json:"owner,omitempty" config:"owner" elastic_mapping:"owner:{type:keyword}"` Tags [] string `json:"tags,omitempty"` Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"` } - -type BasicAuth struct { - Username string `json:"username,omitempty" config:"username" elastic_mapping:"username:{type:keyword}"` - Password string `json:"password,omitempty" config:"password" elastic_mapping:"password:{type:keyword}"` -} \ No newline at end of file diff --git a/plugin/api/gateway/helper.go b/plugin/api/gateway/helper.go deleted file mode 100644 index cf0ee734..00000000 --- a/plugin/api/gateway/helper.go +++ /dev/null @@ -1,162 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * web: https://infinilabs.com - * mail: hello#infini.ltd */ - -package gateway - -import ( - "bytes" - "crypto/tls" - "github.com/segmentio/encoding/json" - "infini.sh/console/model/gateway" - "infini.sh/framework/lib/fasthttp" - "io" - "time" -) - -//import ( -// "fmt" -// "infini.sh/console/model/gateway" -// "infini.sh/framework/core/orm" -// "infini.sh/framework/core/util" -//) -// -//func fetchInstanceGroup(instanceID string) (string, error){ -// // fetch gateway instance group -// q := orm.Query{} -// q.RawQuery = []byte(fmt.Sprintf(`{"size": 1, "query":{"term":{"instance_id":{"value":"%s"}}}}`, instanceID)) -// err, res := orm.Search(&gateway.InstanceGroup{}, &q) -// if err != nil { -// return "", err -// } -// if len(res.Result) > 0 { -// if rowMap, ok := res.Result[0].(map[string]interface{}); ok { -// return rowMap["group_id"].(string), nil -// } -// } -// return "", nil -//} -// -//func fetchInstanceGroupByID(instanceIDs []interface{})([]interface{}, error){ -// if len(instanceIDs) == 0 { -// return nil, nil -// } -// // fetch gateway instance groups -// esQuery := util.MapStr{ -// "query": util.MapStr{ -// "terms": util.MapStr{ -// "instance_id": instanceIDs, -// }, -// }, -// } -// q := orm.Query{} -// q.RawQuery = util.MustToJSONBytes(esQuery) -// err, res := orm.Search(&gateway.InstanceGroup{}, &q) -// return res.Result, err -//} -//func fetchGroupByID(groupIDs []interface{})([]interface{}, error){ -// if len(groupIDs) == 0 { -// return nil, nil -// } -// // fetch gateway groups -// esQuery := util.MapStr{ -// "query": util.MapStr{ -// "terms": util.MapStr{ -// "_id": groupIDs, -// }, -// }, -// } -// q := orm.Query{} -// q.RawQuery = util.MustToJSONBytes(esQuery) -// err, res := orm.Search(&gateway.Group{}, &q) -// return res.Result, err -//} -// -//func pickElasticsearchColumnValues(result []interface{}, columnName string) []interface{}{ -// if len(result) == 0 { -// return nil -// } -// columnValues := make([]interface{}, 0, len(result)) -// for _, row := range result { -// if rowMap, ok := row.(map[string]interface{}); ok { -// columnValues = append(columnValues, rowMap[columnName]) -// } -// } -// return columnValues -//} -// -//func getRelationshipMap(result []interface{}, key string, value string) map[string]interface{}{ -// if len(result) == 0 { -// return nil -// } -// resultMap := map[string]interface{}{} -// for _, row := range result { -// if rowMap, ok := row.(map[string]interface{}); ok { -// resultMap[rowMap[key].(string)] = rowMap[value] -// } -// } -// return resultMap -//} - -type ProxyRequest struct { - Endpoint string - Path string - Method string - BasicAuth gateway.BasicAuth - Body interface{} -} - -type ProxyResponse struct { - Body []byte - StatusCode int -} - -func doGatewayRequest(req *ProxyRequest) (*ProxyResponse, error){ - var ( - freq = fasthttp.AcquireRequest() - fres = fasthttp.AcquireResponse() - ) - defer func() { - fasthttp.ReleaseRequest(freq) - fasthttp.ReleaseResponse(fres) - }() - freq.SetRequestURI(req.Endpoint+ req.Path) - freq.Header.SetMethod(req.Method) - if req.BasicAuth.Username != ""{ - freq.SetBasicAuth(req.BasicAuth.Username, req.BasicAuth.Password) - } - if req.Body != nil { - switch req.Body.(type) { - case []byte: - freq.SetBody(req.Body.([]byte)) - case string: - freq.SetBody([]byte(req.Body.(string))) - case io.Reader: - freq.SetBodyStream(req.Body.(io.Reader), -1) - default: - rw := &bytes.Buffer{} - enc := json.NewEncoder(rw) - err := enc.Encode(req.Body) - if err != nil { - return nil, err - } - freq.SetBody(rw.Bytes()) - } - } - - client := &fasthttp.Client{ - MaxConnsPerHost: 1000, - TLSConfig: &tls.Config{InsecureSkipVerify: true}, - ReadTimeout: time.Second * 5, - } - err := client.Do(freq, fres) - if err != nil { - return nil, err - } - - return &ProxyResponse{ - Body: fres.Body(), - StatusCode: fres.StatusCode(), - }, nil - -} diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index 0b0af9f1..9e09f7f1 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -9,8 +9,10 @@ import ( log "github.com/cihub/seelog" "github.com/segmentio/encoding/json" "infini.sh/console/model/gateway" + "infini.sh/framework/core/agent" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/orm" + "infini.sh/framework/core/proxy" "infini.sh/framework/core/util" "net/http" "strconv" @@ -214,11 +216,11 @@ func (h *GatewayAPI) getInstanceStatus(w http.ResponseWriter, req *http.Request, password = "" } gid, _ := instance.GetValue("id") - res, err := doGatewayRequest(&ProxyRequest{ + res, err := proxy.DoProxyRequest(&proxy.Request{ Endpoint: endpoint.(string), Method: http.MethodGet, Path: "/stats", - BasicAuth: gateway.BasicAuth{ + BasicAuth: agent.BasicAuth{ Username: username.(string), Password: password.(string), }, @@ -262,12 +264,13 @@ func (h *GatewayAPI) proxy(w http.ResponseWriter, req *http.Request, ps httprout }, http.StatusNotFound) return } - res, err := doGatewayRequest(&ProxyRequest{ + res, err := proxy.DoProxyRequest(&proxy.Request{ Method: method, Endpoint: obj.Endpoint, Path: path, Body: req.Body, BasicAuth: obj.BasicAuth, + ContentLength: int(req.ContentLength), }) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -290,8 +293,8 @@ type GatewayConnectResponse struct { } `json:"version"` } -func (h *GatewayAPI) doConnect(endpoint string, basicAuth gateway.BasicAuth) (*GatewayConnectResponse, error) { - res, err := doGatewayRequest(&ProxyRequest{ +func (h *GatewayAPI) doConnect(endpoint string, basicAuth agent.BasicAuth) (*GatewayConnectResponse, error) { + res, err := proxy.DoProxyRequest(&proxy.Request{ Method: http.MethodGet, Endpoint: endpoint, Path: "/_framework/api/_info", @@ -313,7 +316,7 @@ func (h *GatewayAPI) doConnect(endpoint string, basicAuth gateway.BasicAuth) (*G func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var reqBody = struct { Endpoint string `json:"endpoint"` - BasicAuth gateway.BasicAuth + BasicAuth agent.BasicAuth }{} err := h.DecodeJSON(req, &reqBody) if err != nil { diff --git a/plugin/api/insight/api.go b/plugin/api/insight/api.go index 01f00960..cba24a23 100644 --- a/plugin/api/insight/api.go +++ b/plugin/api/insight/api.go @@ -14,6 +14,7 @@ func init() { insight := InsightAPI{} api.HandleAPIMethod(api.POST, "/elasticsearch/:id/visualization/metadata", insight.HandleGetMetadata) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/visualization/data", insight.HandleGetMetricData) + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/visualization/preview", insight.HandleGetPreview) api.HandleAPIMethod(api.GET, "/insight/visualization/:visualization_id", insight.getVisualization) api.HandleAPIMethod(api.POST, "/insight/visualization", insight.createVisualization) diff --git a/plugin/api/insight/metadata.go b/plugin/api/insight/metadata.go index 248e06f7..0554e1a6 100644 --- a/plugin/api/insight/metadata.go +++ b/plugin/api/insight/metadata.go @@ -14,8 +14,113 @@ import ( "infini.sh/framework/core/util" "math" "net/http" + "strings" ) +func (h *InsightAPI) HandleGetPreview(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + clusterID := ps.MustGetParameter("id") + reqBody := struct { + IndexPattern string `json:"index_pattern"` + ViewID string `json:"view_id"` + TimeField string `json:"time_field"` + Filter interface{} `json:"filter"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + if reqBody.ViewID != "" { + view := elastic.View{ + ID: reqBody.ViewID, + } + exists, err := orm.Get(&view) + if err != nil || !exists { + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusNotFound) + return + } + reqBody.IndexPattern = view.Title + reqBody.TimeField = view.TimeFieldName + + } + var timeFields []string + if reqBody.TimeField == "" { + fieldsMeta, err := getFieldsMetadata(reqBody.IndexPattern, clusterID) + if err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + for fieldName := range fieldsMeta.Dates { + timeFields = append(timeFields, fieldName) + } + }else{ + timeFields = []string{reqBody.TimeField} + } + + aggs := util.MapStr{ + "doc_count": util.MapStr{ + "value_count": util.MapStr{"field": "_id"}, + }, + } + + for _, tfield := range timeFields { + aggs["maxTime_"+tfield] = util.MapStr{ + "max": util.MapStr{ "field": tfield }, + } + aggs["minTime_"+tfield] = util.MapStr{ + "min": util.MapStr{ "field": tfield }, + } + } + query := util.MapStr{ + "aggs": aggs, + } + if reqBody.Filter != nil { + query["query"] = reqBody.Filter + } + + esClient := elastic.GetClient(clusterID) + searchRes, err := esClient.SearchWithRawQueryDSL(reqBody.IndexPattern, util.MustToJSONBytes(query)) + if err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + result := util.MapStr{ + "doc_count": searchRes.Aggregations["doc_count"].Value, + } + tfieldsM := map[string]util.MapStr{} + for ak, av := range searchRes.Aggregations { + if strings.HasPrefix(ak,"maxTime_") { + tfield := ak[8:] + if _, ok := tfieldsM[tfield]; !ok { + tfieldsM[tfield] = util.MapStr{} + } + tfieldsM[tfield]["max"] = av.Value + continue + } + if strings.HasPrefix(ak,"minTime_") { + tfield := ak[8:] + if _, ok := tfieldsM[tfield]; !ok { + tfieldsM[tfield] = util.MapStr{} + } + tfieldsM[tfield]["min"] = av.Value + continue + } + } + result["time_fields"] = tfieldsM + h.WriteJSON(w, result, http.StatusOK) + +} func (h *InsightAPI) HandleGetMetadata(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ clusterID := ps.MustGetParameter("id") reqBody := struct { @@ -193,17 +298,9 @@ func getMetadataByIndexPattern(clusterID, indexPattern, timeField string, filter var ( metas []insight.Visualization seriesType string - options = map[string]interface{}{ - "yField": "value", - } + aggTypes []string ) - if timeField != "" { - options["xAxis"] = util.MapStr{ - "type": "time", - } - options["xField"] = "timestamp" - } var fieldNames []string for fieldName := range fieldsMeta.Aggregatable { fieldNames = append(fieldNames, fieldName) @@ -220,7 +317,15 @@ func getMetadataByIndexPattern(clusterID, indexPattern, timeField string, filter return nil, err } for fieldName, count := range counts { - delete(options, "seriesField") + options := map[string]interface{}{ + "yField": "value", + } + if timeField != "" { + options["xAxis"] = util.MapStr{ + "type": "time", + } + options["xField"] = "timestamp" + } if count <= 1 { continue } @@ -230,10 +335,10 @@ func getMetadataByIndexPattern(clusterID, indexPattern, timeField string, filter if timeField == "" { seriesType = "pie" }else { - //if aggField.Type == "string"{ + if aggField.Type == "string"{ seriesType = "column" options["seriesField"] = "group" - //} + } } } var defaultAggType string @@ -309,6 +414,7 @@ func countFieldValue(fields []string, clusterID, indexPattern string, filter int } if filter != nil { queryDsl["query"] = filter + queryDsl["aggs"] = aggs } esClient := elastic.GetClient(clusterID) searchRes, err := esClient.SearchWithRawQueryDSL(indexPattern, util.MustToJSONBytes(queryDsl)) @@ -328,6 +434,12 @@ func countFieldValue(fields []string, clusterID, indexPattern string, filter int fieldsCount[key] = mAgg["value"].(float64) } } + }else{ + for key, agg := range aggsM { + if mAgg, ok := agg.(map[string]interface{});ok{ + fieldsCount[key] = mAgg["value"].(float64) + } + } } }