Merge branch 'master' of ssh://git.infini.ltd:64221/infini/console

This commit is contained in:
medcl 2022-08-25 15:47:40 +08:00
commit 0fd7b457be
9 changed files with 168 additions and 213 deletions

View File

@ -281,6 +281,9 @@
{"name": "indices.exists_template", "methods":["get"], {"name": "indices.exists_template", "methods":["get"],
"path": "/_template/:name" "path": "/_template/:name"
}, },
{"name": "indices.field_usage_stats", "methods":["get"],
"path": "/:index_name/_field_usage_stats"
},
{"name": "doc.*", "methods": ["*"], {"name": "doc.*", "methods": ["*"],
"path": "/:index_name/:doctype" "path": "/:index_name/:doctype"
}, },

View File

@ -1,4 +1,3 @@
# for the system cluster, please use Elasticsearch v7.3+ # for the system cluster, please use Elasticsearch v7.3+
elasticsearch: elasticsearch:
- name: default - name: default
@ -28,28 +27,28 @@ web:
enabled: true enabled: true
elastic: elastic:
elasticsearch: default elasticsearch: default
enabled: true
remote_configs: true
health_check:
enabled: true enabled: true
remote_configs: true interval: 30s
health_check: availability_check:
enabled: true enabled: true
interval: 30s interval: 60s
availability_check: metadata_refresh:
enabled: true enabled: true
interval: 60s interval: 30s
metadata_refresh: cluster_settings_check:
enabled: true enabled: true
interval: 30s interval: 20s
cluster_settings_check: store:
enabled: true enabled: false
interval: 20s orm:
store: enabled: true
enabled: true init_template: true
orm: template_name: ".infini"
enabled: true index_prefix: ".infini_"
init_template: true
template_name: ".infini"
index_prefix: ".infini_"
metrics: metrics:
enabled: true enabled: true
@ -90,7 +89,7 @@ pipeline:
queues: queues:
type: indexing_merge type: indexing_merge
when: when:
cluster_available: [ "default" ] cluster_available: ["default"]
- name: metadata_ingest - name: metadata_ingest
auto_start: true auto_start: true
keep_running: true keep_running: true
@ -106,7 +105,7 @@ pipeline:
consumer: consumer:
group: metadata group: metadata
when: when:
cluster_available: [ "default" ] cluster_available: ["default"]
- name: activity_ingest - name: activity_ingest
auto_start: true auto_start: true
keep_running: true keep_running: true
@ -122,4 +121,4 @@ pipeline:
consumer: consumer:
group: activity group: activity
when: when:
cluster_available: [ "default" ] cluster_available: ["default"]

View File

@ -67,7 +67,7 @@ func checkElasticsearchRequire() error{
versionNumber, err := jsonparser.GetString(result.Body, "version", "number") versionNumber, err := jsonparser.GetString(result.Body, "version", "number")
if err != nil { if err != nil {
return fmt.Errorf("check elasticsearch requirement error: %v", err) return fmt.Errorf("check elasticsearch requirement error: %v, got response: %s", err, string(result.Body))
} }
cr, err := util.VersionCompare(versionNumber, "7.3") cr, err := util.VersionCompare(versionNumber, "7.3")
if err !=nil { if err !=nil {

View File

@ -3,6 +3,7 @@ package main
import ( import (
"errors" "errors"
_ "expvar" _ "expvar"
log "github.com/cihub/seelog"
"infini.sh/console/config" "infini.sh/console/config"
"infini.sh/console/model/alerting" "infini.sh/console/model/alerting"
"infini.sh/console/model/gateway" "infini.sh/console/model/gateway"
@ -15,6 +16,7 @@ import (
_ "infini.sh/framework/core/log" _ "infini.sh/framework/core/log"
"infini.sh/framework/core/module" "infini.sh/framework/core/module"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/modules/agent"
_ "infini.sh/framework/modules/api" _ "infini.sh/framework/modules/api"
elastic2 "infini.sh/framework/modules/elastic" elastic2 "infini.sh/framework/modules/elastic"
"infini.sh/framework/modules/filter" "infini.sh/framework/modules/filter"
@ -28,7 +30,7 @@ import (
_ "infini.sh/framework/plugins" _ "infini.sh/framework/plugins"
api2 "infini.sh/gateway/api" api2 "infini.sh/gateway/api"
_ "infini.sh/gateway/proxy" _ "infini.sh/gateway/proxy"
log "src/github.com/cihub/seelog" _ "time/tzdata"
) )
var appConfig *config.AppConfig var appConfig *config.AppConfig
@ -73,6 +75,7 @@ func main() {
module.RegisterSystemModule(&ui.UIModule{}) module.RegisterSystemModule(&ui.UIModule{})
module.RegisterSystemModule(&pipeline.PipeModule{}) module.RegisterSystemModule(&pipeline.PipeModule{})
module.RegisterSystemModule(&task.TaskModule{}) module.RegisterSystemModule(&task.TaskModule{})
module.RegisterSystemModule(&agent.AgentModule{})
module.RegisterUserPlugin(&metrics.MetricsModule{}) module.RegisterUserPlugin(&metrics.MetricsModule{})

View File

@ -5,6 +5,7 @@
package gateway package gateway
import ( import (
"infini.sh/framework/core/agent"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
) )
@ -16,13 +17,8 @@ type Instance struct {
Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"` Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"`
Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"` Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"`
Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"` Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"`
BasicAuth BasicAuth `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"` BasicAuth agent.BasicAuth `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"`
Owner string `json:"owner,omitempty" config:"owner" elastic_mapping:"owner:{type:keyword}"` Owner string `json:"owner,omitempty" config:"owner" elastic_mapping:"owner:{type:keyword}"`
Tags [] string `json:"tags,omitempty"` Tags [] string `json:"tags,omitempty"`
Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"` Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"`
} }
type BasicAuth struct {
Username string `json:"username,omitempty" config:"username" elastic_mapping:"username:{type:keyword}"`
Password string `json:"password,omitempty" config:"password" elastic_mapping:"password:{type:keyword}"`
}

View File

@ -1,162 +0,0 @@
/* Copyright © INFINI Ltd. All rights reserved.
* web: https://infinilabs.com
* mail: hello#infini.ltd */
package gateway
import (
"bytes"
"crypto/tls"
"github.com/segmentio/encoding/json"
"infini.sh/console/model/gateway"
"infini.sh/framework/lib/fasthttp"
"io"
"time"
)
//import (
// "fmt"
// "infini.sh/console/model/gateway"
// "infini.sh/framework/core/orm"
// "infini.sh/framework/core/util"
//)
//
//func fetchInstanceGroup(instanceID string) (string, error){
// // fetch gateway instance group
// q := orm.Query{}
// q.RawQuery = []byte(fmt.Sprintf(`{"size": 1, "query":{"term":{"instance_id":{"value":"%s"}}}}`, instanceID))
// err, res := orm.Search(&gateway.InstanceGroup{}, &q)
// if err != nil {
// return "", err
// }
// if len(res.Result) > 0 {
// if rowMap, ok := res.Result[0].(map[string]interface{}); ok {
// return rowMap["group_id"].(string), nil
// }
// }
// return "", nil
//}
//
//func fetchInstanceGroupByID(instanceIDs []interface{})([]interface{}, error){
// if len(instanceIDs) == 0 {
// return nil, nil
// }
// // fetch gateway instance groups
// esQuery := util.MapStr{
// "query": util.MapStr{
// "terms": util.MapStr{
// "instance_id": instanceIDs,
// },
// },
// }
// q := orm.Query{}
// q.RawQuery = util.MustToJSONBytes(esQuery)
// err, res := orm.Search(&gateway.InstanceGroup{}, &q)
// return res.Result, err
//}
//func fetchGroupByID(groupIDs []interface{})([]interface{}, error){
// if len(groupIDs) == 0 {
// return nil, nil
// }
// // fetch gateway groups
// esQuery := util.MapStr{
// "query": util.MapStr{
// "terms": util.MapStr{
// "_id": groupIDs,
// },
// },
// }
// q := orm.Query{}
// q.RawQuery = util.MustToJSONBytes(esQuery)
// err, res := orm.Search(&gateway.Group{}, &q)
// return res.Result, err
//}
//
//func pickElasticsearchColumnValues(result []interface{}, columnName string) []interface{}{
// if len(result) == 0 {
// return nil
// }
// columnValues := make([]interface{}, 0, len(result))
// for _, row := range result {
// if rowMap, ok := row.(map[string]interface{}); ok {
// columnValues = append(columnValues, rowMap[columnName])
// }
// }
// return columnValues
//}
//
//func getRelationshipMap(result []interface{}, key string, value string) map[string]interface{}{
// if len(result) == 0 {
// return nil
// }
// resultMap := map[string]interface{}{}
// for _, row := range result {
// if rowMap, ok := row.(map[string]interface{}); ok {
// resultMap[rowMap[key].(string)] = rowMap[value]
// }
// }
// return resultMap
//}
type ProxyRequest struct {
Endpoint string
Path string
Method string
BasicAuth gateway.BasicAuth
Body interface{}
}
type ProxyResponse struct {
Body []byte
StatusCode int
}
func doGatewayRequest(req *ProxyRequest) (*ProxyResponse, error){
var (
freq = fasthttp.AcquireRequest()
fres = fasthttp.AcquireResponse()
)
defer func() {
fasthttp.ReleaseRequest(freq)
fasthttp.ReleaseResponse(fres)
}()
freq.SetRequestURI(req.Endpoint+ req.Path)
freq.Header.SetMethod(req.Method)
if req.BasicAuth.Username != ""{
freq.SetBasicAuth(req.BasicAuth.Username, req.BasicAuth.Password)
}
if req.Body != nil {
switch req.Body.(type) {
case []byte:
freq.SetBody(req.Body.([]byte))
case string:
freq.SetBody([]byte(req.Body.(string)))
case io.Reader:
freq.SetBodyStream(req.Body.(io.Reader), -1)
default:
rw := &bytes.Buffer{}
enc := json.NewEncoder(rw)
err := enc.Encode(req.Body)
if err != nil {
return nil, err
}
freq.SetBody(rw.Bytes())
}
}
client := &fasthttp.Client{
MaxConnsPerHost: 1000,
TLSConfig: &tls.Config{InsecureSkipVerify: true},
ReadTimeout: time.Second * 5,
}
err := client.Do(freq, fres)
if err != nil {
return nil, err
}
return &ProxyResponse{
Body: fres.Body(),
StatusCode: fres.StatusCode(),
}, nil
}

View File

@ -9,8 +9,10 @@ import (
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
"github.com/segmentio/encoding/json" "github.com/segmentio/encoding/json"
"infini.sh/console/model/gateway" "infini.sh/console/model/gateway"
"infini.sh/framework/core/agent"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/proxy"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"net/http" "net/http"
"strconv" "strconv"
@ -214,11 +216,11 @@ func (h *GatewayAPI) getInstanceStatus(w http.ResponseWriter, req *http.Request,
password = "" password = ""
} }
gid, _ := instance.GetValue("id") gid, _ := instance.GetValue("id")
res, err := doGatewayRequest(&ProxyRequest{ res, err := proxy.DoProxyRequest(&proxy.Request{
Endpoint: endpoint.(string), Endpoint: endpoint.(string),
Method: http.MethodGet, Method: http.MethodGet,
Path: "/stats", Path: "/stats",
BasicAuth: gateway.BasicAuth{ BasicAuth: agent.BasicAuth{
Username: username.(string), Username: username.(string),
Password: password.(string), Password: password.(string),
}, },
@ -262,12 +264,13 @@ func (h *GatewayAPI) proxy(w http.ResponseWriter, req *http.Request, ps httprout
}, http.StatusNotFound) }, http.StatusNotFound)
return return
} }
res, err := doGatewayRequest(&ProxyRequest{ res, err := proxy.DoProxyRequest(&proxy.Request{
Method: method, Method: method,
Endpoint: obj.Endpoint, Endpoint: obj.Endpoint,
Path: path, Path: path,
Body: req.Body, Body: req.Body,
BasicAuth: obj.BasicAuth, BasicAuth: obj.BasicAuth,
ContentLength: int(req.ContentLength),
}) })
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
@ -290,8 +293,8 @@ type GatewayConnectResponse struct {
} `json:"version"` } `json:"version"`
} }
func (h *GatewayAPI) doConnect(endpoint string, basicAuth gateway.BasicAuth) (*GatewayConnectResponse, error) { func (h *GatewayAPI) doConnect(endpoint string, basicAuth agent.BasicAuth) (*GatewayConnectResponse, error) {
res, err := doGatewayRequest(&ProxyRequest{ res, err := proxy.DoProxyRequest(&proxy.Request{
Method: http.MethodGet, Method: http.MethodGet,
Endpoint: endpoint, Endpoint: endpoint,
Path: "/_framework/api/_info", Path: "/_framework/api/_info",
@ -313,7 +316,7 @@ func (h *GatewayAPI) doConnect(endpoint string, basicAuth gateway.BasicAuth) (*G
func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var reqBody = struct { var reqBody = struct {
Endpoint string `json:"endpoint"` Endpoint string `json:"endpoint"`
BasicAuth gateway.BasicAuth BasicAuth agent.BasicAuth
}{} }{}
err := h.DecodeJSON(req, &reqBody) err := h.DecodeJSON(req, &reqBody)
if err != nil { if err != nil {

View File

@ -14,6 +14,7 @@ func init() {
insight := InsightAPI{} insight := InsightAPI{}
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/visualization/metadata", insight.HandleGetMetadata) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/visualization/metadata", insight.HandleGetMetadata)
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/visualization/data", insight.HandleGetMetricData) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/visualization/data", insight.HandleGetMetricData)
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/visualization/preview", insight.HandleGetPreview)
api.HandleAPIMethod(api.GET, "/insight/visualization/:visualization_id", insight.getVisualization) api.HandleAPIMethod(api.GET, "/insight/visualization/:visualization_id", insight.getVisualization)
api.HandleAPIMethod(api.POST, "/insight/visualization", insight.createVisualization) api.HandleAPIMethod(api.POST, "/insight/visualization", insight.createVisualization)

View File

@ -14,8 +14,113 @@ import (
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"math" "math"
"net/http" "net/http"
"strings"
) )
func (h *InsightAPI) HandleGetPreview(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
clusterID := ps.MustGetParameter("id")
reqBody := struct {
IndexPattern string `json:"index_pattern"`
ViewID string `json:"view_id"`
TimeField string `json:"time_field"`
Filter interface{} `json:"filter"`
}{}
err := h.DecodeJSON(req, &reqBody)
if err != nil {
log.Error(err)
h.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
return
}
if reqBody.ViewID != "" {
view := elastic.View{
ID: reqBody.ViewID,
}
exists, err := orm.Get(&view)
if err != nil || !exists {
h.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusNotFound)
return
}
reqBody.IndexPattern = view.Title
reqBody.TimeField = view.TimeFieldName
}
var timeFields []string
if reqBody.TimeField == "" {
fieldsMeta, err := getFieldsMetadata(reqBody.IndexPattern, clusterID)
if err != nil {
log.Error(err)
h.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
return
}
for fieldName := range fieldsMeta.Dates {
timeFields = append(timeFields, fieldName)
}
}else{
timeFields = []string{reqBody.TimeField}
}
aggs := util.MapStr{
"doc_count": util.MapStr{
"value_count": util.MapStr{"field": "_id"},
},
}
for _, tfield := range timeFields {
aggs["maxTime_"+tfield] = util.MapStr{
"max": util.MapStr{ "field": tfield },
}
aggs["minTime_"+tfield] = util.MapStr{
"min": util.MapStr{ "field": tfield },
}
}
query := util.MapStr{
"aggs": aggs,
}
if reqBody.Filter != nil {
query["query"] = reqBody.Filter
}
esClient := elastic.GetClient(clusterID)
searchRes, err := esClient.SearchWithRawQueryDSL(reqBody.IndexPattern, util.MustToJSONBytes(query))
if err != nil {
log.Error(err)
h.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
return
}
result := util.MapStr{
"doc_count": searchRes.Aggregations["doc_count"].Value,
}
tfieldsM := map[string]util.MapStr{}
for ak, av := range searchRes.Aggregations {
if strings.HasPrefix(ak,"maxTime_") {
tfield := ak[8:]
if _, ok := tfieldsM[tfield]; !ok {
tfieldsM[tfield] = util.MapStr{}
}
tfieldsM[tfield]["max"] = av.Value
continue
}
if strings.HasPrefix(ak,"minTime_") {
tfield := ak[8:]
if _, ok := tfieldsM[tfield]; !ok {
tfieldsM[tfield] = util.MapStr{}
}
tfieldsM[tfield]["min"] = av.Value
continue
}
}
result["time_fields"] = tfieldsM
h.WriteJSON(w, result, http.StatusOK)
}
func (h *InsightAPI) HandleGetMetadata(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ func (h *InsightAPI) HandleGetMetadata(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
clusterID := ps.MustGetParameter("id") clusterID := ps.MustGetParameter("id")
reqBody := struct { reqBody := struct {
@ -193,17 +298,9 @@ func getMetadataByIndexPattern(clusterID, indexPattern, timeField string, filter
var ( var (
metas []insight.Visualization metas []insight.Visualization
seriesType string seriesType string
options = map[string]interface{}{
"yField": "value",
}
aggTypes []string aggTypes []string
) )
if timeField != "" {
options["xAxis"] = util.MapStr{
"type": "time",
}
options["xField"] = "timestamp"
}
var fieldNames []string var fieldNames []string
for fieldName := range fieldsMeta.Aggregatable { for fieldName := range fieldsMeta.Aggregatable {
fieldNames = append(fieldNames, fieldName) fieldNames = append(fieldNames, fieldName)
@ -220,7 +317,15 @@ func getMetadataByIndexPattern(clusterID, indexPattern, timeField string, filter
return nil, err return nil, err
} }
for fieldName, count := range counts { for fieldName, count := range counts {
delete(options, "seriesField") options := map[string]interface{}{
"yField": "value",
}
if timeField != "" {
options["xAxis"] = util.MapStr{
"type": "time",
}
options["xField"] = "timestamp"
}
if count <= 1 { if count <= 1 {
continue continue
} }
@ -230,10 +335,10 @@ func getMetadataByIndexPattern(clusterID, indexPattern, timeField string, filter
if timeField == "" { if timeField == "" {
seriesType = "pie" seriesType = "pie"
}else { }else {
//if aggField.Type == "string"{ if aggField.Type == "string"{
seriesType = "column" seriesType = "column"
options["seriesField"] = "group" options["seriesField"] = "group"
//} }
} }
} }
var defaultAggType string var defaultAggType string
@ -309,6 +414,7 @@ func countFieldValue(fields []string, clusterID, indexPattern string, filter int
} }
if filter != nil { if filter != nil {
queryDsl["query"] = filter queryDsl["query"] = filter
queryDsl["aggs"] = aggs
} }
esClient := elastic.GetClient(clusterID) esClient := elastic.GetClient(clusterID)
searchRes, err := esClient.SearchWithRawQueryDSL(indexPattern, util.MustToJSONBytes(queryDsl)) searchRes, err := esClient.SearchWithRawQueryDSL(indexPattern, util.MustToJSONBytes(queryDsl))
@ -328,6 +434,12 @@ func countFieldValue(fields []string, clusterID, indexPattern string, filter int
fieldsCount[key] = mAgg["value"].(float64) fieldsCount[key] = mAgg["value"].(float64)
} }
} }
}else{
for key, agg := range aggsM {
if mAgg, ok := agg.(map[string]interface{});ok{
fieldsCount[key] = mAgg["value"].(float64)
}
}
} }
} }