refactoring agent proxy
This commit is contained in:
parent
add547e78f
commit
7b84271ab3
|
@ -6,20 +6,16 @@ package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
httprouter "infini.sh/framework/core/api/router"
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
"infini.sh/framework/core/elastic"
|
"infini.sh/framework/core/elastic"
|
||||||
"infini.sh/framework/core/global"
|
|
||||||
"infini.sh/framework/core/model"
|
"infini.sh/framework/core/model"
|
||||||
"infini.sh/framework/core/orm"
|
"infini.sh/framework/core/orm"
|
||||||
"infini.sh/framework/core/util"
|
"infini.sh/framework/core/util"
|
||||||
"infini.sh/framework/modules/elastic/common"
|
"infini.sh/framework/modules/elastic/common"
|
||||||
"infini.sh/framework/plugins/managed/server"
|
"infini.sh/framework/plugins/managed/server"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -113,7 +109,7 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance
|
||||||
}
|
}
|
||||||
|
|
||||||
obj := elastic.DiscoveryResult{}
|
obj := elastic.DiscoveryResult{}
|
||||||
err := doRequest(instance, req, &obj)
|
_,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -149,7 +145,7 @@ func GetElasticLogFiles(ctx context.Context, instance *model.Instance, logsPath
|
||||||
}
|
}
|
||||||
|
|
||||||
resBody := map[string]interface{}{}
|
resBody := map[string]interface{}{}
|
||||||
err := doRequest(instance, req, &resBody)
|
_,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -168,7 +164,7 @@ func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, bod
|
||||||
Body: util.MustToJSONBytes(body),
|
Body: util.MustToJSONBytes(body),
|
||||||
}
|
}
|
||||||
resBody := map[string]interface{}{}
|
resBody := map[string]interface{}{}
|
||||||
err := doRequest(instance, req, &resBody)
|
_,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -397,17 +393,23 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque
|
||||||
func (h *APIHandler) getESNodeInfoViaProxy(host string, schema string, auth *model.BasicAuth, instance model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) {
|
func (h *APIHandler) getESNodeInfoViaProxy(host string, schema string, auth *model.BasicAuth, instance model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) {
|
||||||
esConfig := elastic.ElasticsearchConfig{Host: host, Schema: schema, BasicAuth: auth}
|
esConfig := elastic.ElasticsearchConfig{Host: host, Schema: schema, BasicAuth: auth}
|
||||||
body := util.MustToJSONBytes(esConfig)
|
body := util.MustToJSONBytes(esConfig)
|
||||||
res, err := server.ProxyRequestToRuntimeInstance(instance.GetEndpoint(), "POST", "/elasticsearch/node/_info",
|
|
||||||
body, int64(len(body)), auth)
|
|
||||||
|
|
||||||
if err != nil {
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
panic(err)
|
defer cancel()
|
||||||
|
req := &util.Request{
|
||||||
|
Method: http.MethodPost,
|
||||||
|
Path: "/elasticsearch/node/_info",
|
||||||
|
Context: ctx,
|
||||||
|
Body: body,
|
||||||
|
}
|
||||||
|
if auth != nil {
|
||||||
|
req.SetBasicAuth(auth.Username, auth.Password)
|
||||||
}
|
}
|
||||||
|
|
||||||
if global.Env().IsDebug {
|
obj := elastic.LocalNodeInfo{}
|
||||||
if res != nil { // && res.StatusCode==http.StatusOK
|
res,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj)
|
||||||
log.Debug(string(res.Body))
|
if err != nil {
|
||||||
}
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if res != nil && res.StatusCode == http.StatusForbidden {
|
if res != nil && res.StatusCode == http.StatusForbidden {
|
||||||
|
@ -539,44 +541,3 @@ func (h *APIHandler) enrollESNode(w http.ResponseWriter, req *http.Request, ps h
|
||||||
|
|
||||||
h.WriteAckOKJSON(w)
|
h.WriteAckOKJSON(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
var mTLSClient *http.Client //TODO get mTLSClient
|
|
||||||
var initOnce = sync.Once{}
|
|
||||||
|
|
||||||
func doRequest(instance *model.Instance, req *util.Request, obj interface{}) error {
|
|
||||||
var err error
|
|
||||||
var res *util.Result
|
|
||||||
|
|
||||||
initOnce.Do(func() {
|
|
||||||
if global.Env().SystemConfig.Configs.TLSConfig.TLSEnabled && global.Env().SystemConfig.Configs.TLSConfig.TLSCAFile != "" {
|
|
||||||
|
|
||||||
//init client
|
|
||||||
hClient, err := util.NewMTLSClient(
|
|
||||||
global.Env().SystemConfig.Configs.TLSConfig.TLSCAFile,
|
|
||||||
global.Env().SystemConfig.Configs.TLSConfig.TLSCertFile,
|
|
||||||
global.Env().SystemConfig.Configs.TLSConfig.TLSKeyFile)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
mTLSClient = hClient
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
req.Url, err = url.JoinPath(instance.GetEndpoint(), req.Path)
|
|
||||||
res, err = util.ExecuteRequestWithCatchFlag(mTLSClient, req, true)
|
|
||||||
if err != nil || res.StatusCode != 200 {
|
|
||||||
body := ""
|
|
||||||
if res != nil {
|
|
||||||
body = string(res.Body)
|
|
||||||
}
|
|
||||||
return errors.New(fmt.Sprintf("request error: %v, %v", err, body))
|
|
||||||
}
|
|
||||||
|
|
||||||
if res != nil {
|
|
||||||
if res.Body != nil {
|
|
||||||
return util.FromJSONBytes(res.Body, obj)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue