Merge branch 'master' of ssh://git.infini.ltd:64221/infini/console
This commit is contained in:
commit
083e481bc6
|
@ -26,6 +26,7 @@ pipeline {
|
|||
sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-linux-amd64'
|
||||
sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE'
|
||||
sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.tpl bin/config'
|
||||
sh label: 'copy-certs', script: 'cd /home/jenkins/go/src/infini.sh/console && cp -rf config/certs bin/config'
|
||||
|
||||
sh label: 'package-linux-amd64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-amd64.tar.gz console-linux-amd64 console.yml LICENSE NOTICE config'
|
||||
archiveArtifacts artifacts: 'console-$VERSION-$BUILD_NUMBER-*.*', fingerprint: true, followSymlinks: true, onlyIfSuccessful: false
|
||||
|
|
6
build.sh
6
build.sh
|
@ -27,7 +27,10 @@ GOROOT="/infini/go-pkgs/go-loongarch" PATH=$GOROOT/bin:$PATH make build-linux-lo
|
|||
|
||||
#copy-configs
|
||||
cp -rf $WORKBASE/framework/LICENSE $WORKDIR/bin && cat $WORKBASE/framework/NOTICE $WORKDIR/NOTICE > $WORKDIR/bin/NOTICE
|
||||
mkdir -p $WORKDIR/bin/config && cp $WORKDIR/config/*.json $WORKDIR/bin/config && cp -rf $WORKDIR/config/*.tpl $WORKDIR/bin/config
|
||||
mkdir -p $WORKDIR/bin/config
|
||||
cp $WORKDIR/config/*.json $WORKDIR/bin/config
|
||||
cp -rf $WORKDIR/config/*.tpl $WORKDIR/bin/config
|
||||
cp -rf $WORKDIR/config/certs $WORKDIR/bin/config
|
||||
|
||||
cd $WORKDIR/bin
|
||||
for t in 386 amd64 arm64 armv5 armv6 armv7 loong64 mips mips64 mips64le mipsle riscv64 ; do
|
||||
|
@ -50,6 +53,7 @@ WORKDIR \${APP_HOME}
|
|||
|
||||
COPY ["$PNAME-linux-$t", "$PNAME.yml", "\${APP_HOME}/"]
|
||||
COPY ["config", "\${APP_HOME}/config"]
|
||||
COPY ["config/certs", "\${APP_HOME}/config/certs"]
|
||||
|
||||
CMD ["/opt/$PNAME/${PNAME}-linux-$t"]
|
||||
EOF
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIID+zCCAuOgAwIBAgIUEctl/ds6wIoJGTW3PQc5L/0VlQ4wDQYJKoZIhvcNAQEL
|
||||
BQAwgYwxCzAJBgNVBAYTAkNOMQ4wDAYDVQQIDAVodW5hbjERMA8GA1UEBwwIY2hh
|
||||
bmdzaGExEzARBgNVBAoMCmluZmluaWxhYnMxCzAJBgNVBAsMAml0MRcwFQYDVQQD
|
||||
DA5pbmZpbmlsYWJzLmNvbTEfMB0GA1UEAwwWcmVsZWFzZS5pbmZpbmlsYWJzLmNv
|
||||
bTAeFw0yMzA2MDgwNTI1MzVaFw0zMzA2MDUwNTI1MzVaMIGMMQswCQYDVQQGEwJD
|
||||
TjEOMAwGA1UECAwFaHVuYW4xETAPBgNVBAcMCGNoYW5nc2hhMRMwEQYDVQQKDApp
|
||||
bmZpbmlsYWJzMQswCQYDVQQLDAJpdDEXMBUGA1UEAwwOaW5maW5pbGFicy5jb20x
|
||||
HzAdBgNVBAMMFnJlbGVhc2UuaW5maW5pbGFicy5jb20wggEiMA0GCSqGSIb3DQEB
|
||||
AQUAA4IBDwAwggEKAoIBAQD43Co0NROYEeeZWWH4O3m+V7U+1/4DVuAm+9u1bxqi
|
||||
OnliE24wm9+gk3HEwdr6pMGTfMWS8BMmqUpjjgFVK4Tcur87Cqjq7XDe8j7h5Ipi
|
||||
8yVUAgqF4wesIZpGtxXRZwwGWwRu38zX5CAa9n9Xbp0Y7tDdINRk0vLCp7VQbd2N
|
||||
VbMXgqygJAaAImdNfrddAmojWJ92LCT5HKcDNq8Z62VwtLqOUePiEJxm1sUts9tT
|
||||
sX8XlyLljz2aoWcX+Gzin8HkOftnpYeHptDL26Q2FyW7TYZR4oFuhU6FQ/YPfCsE
|
||||
m/sxVC7BIBWal4DF29ZiivWvWk+wBNq03LxR1/TvJtOdAgMBAAGjUzBRMB0GA1Ud
|
||||
DgQWBBSsTKk1fbAbRxPIydDSatzKh+YaGzAfBgNVHSMEGDAWgBSsTKk1fbAbRxPI
|
||||
ydDSatzKh+YaGzAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAl
|
||||
0p1/QMgdahq22SY+shvvxH9AbtyQo3XFZjWXd0+rMbt0uci640NDhaZxBeMgDWaJ
|
||||
jRe+K7sw+AhnIWzH9RbaYQfVAXxjFb7kHSb93bezXjA3m21O5KpwiQyaXCbsIVSf
|
||||
n59pd3+EV/Q96EclNMoixpDUVtqI7i046/3imZ4XyBDpQPWCajaKpp8rkypCvykK
|
||||
KQ7BGF8lr3WyAgfsoHi9UrWcN1n3ynyy9T9qr1CFmopQiwSQo+036a8F/3Y9KIFM
|
||||
nhQWTBVcXkbmZtxpRRaD9rC6p+2aqfVNmjCDuYxjuGmTqL/0eayRefk0QwT5S51L
|
||||
ea5WlmQtUbCpewnNiNrz
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,28 @@
|
|||
-----BEGIN PRIVATE KEY-----
|
||||
MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQD43Co0NROYEeeZ
|
||||
WWH4O3m+V7U+1/4DVuAm+9u1bxqiOnliE24wm9+gk3HEwdr6pMGTfMWS8BMmqUpj
|
||||
jgFVK4Tcur87Cqjq7XDe8j7h5Ipi8yVUAgqF4wesIZpGtxXRZwwGWwRu38zX5CAa
|
||||
9n9Xbp0Y7tDdINRk0vLCp7VQbd2NVbMXgqygJAaAImdNfrddAmojWJ92LCT5HKcD
|
||||
Nq8Z62VwtLqOUePiEJxm1sUts9tTsX8XlyLljz2aoWcX+Gzin8HkOftnpYeHptDL
|
||||
26Q2FyW7TYZR4oFuhU6FQ/YPfCsEm/sxVC7BIBWal4DF29ZiivWvWk+wBNq03LxR
|
||||
1/TvJtOdAgMBAAECggEAHfFKQq3VhbPhzIRXUqlqpImjt8P/6XbyfBOhroWHttfs
|
||||
8u1TkhWvJSrtwrbFxOfol/j97LK7cH4TV+x7dzvykycFC0g6ZXSG4sxS30Btm3+V
|
||||
5jMOARNSllYUsfLQH+STgHrttZ7H5CdWlZKYZR9crOX5y3a/whcqOQbkvKCmH522
|
||||
CUWeoabELLFGh6sGkQ6dXMFMS/fzT9MK3gxoLYYoTum5vJN2HYxMBerTbc3wXPtN
|
||||
/BY8gStkv9BtbovPqsHUWeWK1fXphqysfKE79k9zS9PPz8ErEPhhT44blIo0BKCp
|
||||
akw4DjHXbHhFq8Onh+3NemTLIhx++8z0FgL+6jYdIQKBgQD7BnNhHSxRm/smvyVm
|
||||
L3fOX22fL1ZruY9sV4h3pK7KEqMLoUfrOpBN1AiQd/Xb/XNGMrHSPdZGL+tSS7vJ
|
||||
2ibcSJWGuvuhA8HXpSPRUE9ao5cdRtEOTpy0FncMYaa7XYFoi1NOH/xl0uhd2u2i
|
||||
uqL122tafZ9cP8fzvrOICa9ydwKBgQD9yrq3D3m5oIj596jDUsX2jxCKRzdnGLu3
|
||||
/qRnnfsN07524Ydqo8lpQMU+u2ljJhNFjcBqTOjSsRnnd1qQHqxZ18ABgOteAws5
|
||||
FlfLNQ7nupZu0IMd3b2WoCqoUOzLBsofUnMUPGRZLI2QCxfS1gzgp57IpY1egHe9
|
||||
1HL2Ht/7iwKBgCCeYffIlq20GxqZ0/5HRMYoWFJLEGvHHP/zD+ScHapcaZtlRbcn
|
||||
UQEMAGDldak0cfo5NCohpupP58A13x0Hn+0X8XYkbfYqStH+v3y/dtBMWaKQRTIa
|
||||
vPoZwTZ2qffG2r3+/MA2H9ILae5oOGDg57QS4wxLLp4KG61spl+TAp1ZAoGAbtG8
|
||||
UD0gsO6pgUUkWw3kxXHZDhhk187UAVbe4SP5wSLpsy1tnSIhy6TtvCPHs/SnS6LH
|
||||
F2O38nBE2G6ki/Po3F46SC3MGMQJfYcFFQV5GbS/BWplJoBxzbjoh5C2pTy0u5Kc
|
||||
D4UDaHDs6638XCL9goeO7RxlK5O7NZf0DwaSVVsCgYB6JQoVjob9kRIkko1ob63W
|
||||
idsWNZtG3Py1gx61lBPk/Lz/bxJKlsnWgvInjGgFQ0o0g98hnd4H3O89BPwyAgKa
|
||||
dIKfW/2k7SG0B+wB1xhnaBtHvJ3tWdqcH9wcTbOQ4LbT1OlvEQYc2B7cYicS22oZ
|
||||
1lUUI2e9WKSTzwHk+nZQEw==
|
||||
-----END PRIVATE KEY-----
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -115,8 +115,8 @@ agent_exc="${install_path}/agent/agent-${os}-${arch}" #agent可执行文件
|
|||
|
||||
agent_exsit="true"
|
||||
if [ ! -d "${install_path}/agent" ]; then
|
||||
printf "\n* mkdir ${install_path}/agent"
|
||||
$sudo_cmd mkdir "${install_path}/agent"
|
||||
printf "\n* mkdir -p ${install_path}/agent"
|
||||
$sudo_cmd mkdir -p "${install_path}/agent"
|
||||
agent_exsit="false"
|
||||
fi
|
||||
|
||||
|
@ -161,7 +161,7 @@ rm -f ${agent}
|
|||
##################
|
||||
# save cert
|
||||
##################
|
||||
$sudo_cmd mkdir config
|
||||
$sudo_cmd mkdir -p config
|
||||
$sudo_cmd sh -c "echo '${ca_crt}' > ./config/ca.crt"
|
||||
$sudo_cmd sh -c "echo '${client_crt}' > ./config/client.crt"
|
||||
$sudo_cmd sh -c "echo '${client_key}' > ./config/client.key"
|
||||
|
@ -184,9 +184,9 @@ api:
|
|||
enabled: true
|
||||
tls:
|
||||
enabled: true
|
||||
cert_file: "${install_path}/agent/config/client.crt"
|
||||
key_file: "${install_path}/agent/config/client.key"
|
||||
ca_file: "${install_path}/agent/config/ca.crt"
|
||||
cert_file: "config/client.crt"
|
||||
key_file: "config/client.key"
|
||||
ca_file: "config/ca.crt"
|
||||
skip_insecure_verify: false
|
||||
network:
|
||||
binding: \$[[env.API_BINDING]]
|
||||
|
@ -213,10 +213,10 @@ fi
|
|||
$sudo_cmd chmod +x $agent_exc
|
||||
|
||||
#try to stop and uninstall service
|
||||
if [[ "$agent_exsit" == "true" ]]; then
|
||||
if [[ -e /etc/systemd/system/agent.service ]]; then
|
||||
printf "\n* stop && uninstall service\n"
|
||||
$sudo_cmd $agent_exc -service stop
|
||||
$sudo_cmd $agent_exc -service uninstall
|
||||
$sudo_cmd $agent_exc -service stop &>/dev/null
|
||||
$sudo_cmd $agent_exc -service uninstall &>/dev/null
|
||||
fi
|
||||
|
||||
printf "\n* start install service\n"
|
||||
|
|
12
console.yml
12
console.yml
|
@ -2,8 +2,8 @@ path.configs: "config"
|
|||
configs.auto_reload: true
|
||||
|
||||
#env:
|
||||
# INFINI_CONSOLE_ENDPOINT: "http://192.168.3.9:9000"
|
||||
# INGEST_CLUSTER_ENDPOINT: "http://192.168.3.9:9210"
|
||||
# INFINI_CONSOLE_ENDPOINT: "http://127.0.0.1:9000"
|
||||
# INGEST_CLUSTER_ENDPOINT: "https://127.0.0.1:9200"
|
||||
# INGEST_CLUSTER_CREDENTIAL_ID: chjkp9dath21f1ae9tq0
|
||||
|
||||
web:
|
||||
|
@ -73,10 +73,10 @@ badger:
|
|||
|
||||
#agent:
|
||||
# setup:
|
||||
# download_url: "https://release.infinilabs.com/agent/snapshot"
|
||||
# version: 0.5.0_NIGHTLY-157
|
||||
# ca_cert: "/opt/config/certs/ca.crt"
|
||||
# ca_key: "/opt/config/certs/ca.key"
|
||||
# download_url: "https://release.infinilabs.com/agent/stable"
|
||||
# version: 0.5.0-214
|
||||
# ca_cert: "config/certs/ca.crt"
|
||||
# ca_key: "config/certs/ca.key"
|
||||
# console_endpoint: $[[env.INFINI_CONSOLE_ENDPOINT]]
|
||||
# ingest_cluster_endpoint: $[[env.INGEST_CLUSTER_ENDPOINT]]
|
||||
# ingest_cluster_credential_id: $[[env.INGEST_CLUSTER_CREDENTIAL_ID]]
|
|
@ -49,15 +49,23 @@ func (module *AgentModule) Start() error {
|
|||
var (
|
||||
executor client.Executor
|
||||
err error
|
||||
caFile string
|
||||
caKey string
|
||||
)
|
||||
if module.AgentConfig.Setup == nil {
|
||||
executor = &client.HttpExecutor{}
|
||||
}else{
|
||||
executor, err = client.NewMTLSExecutor(module.AgentConfig.Setup.CACertFile, module.AgentConfig.Setup.CAKeyFile)
|
||||
if module.AgentConfig.Setup != nil {
|
||||
caFile = module.AgentConfig.Setup.CACertFile
|
||||
caKey = module.AgentConfig.Setup.CAKeyFile
|
||||
}
|
||||
if caFile == "" && caKey == "" {
|
||||
caFile, caKey, err = common.GetOrInitDefaultCaCerts()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
executor, err = client.NewMTLSExecutor(caFile, caKey)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
agClient := &client.Client{
|
||||
Executor: executor,
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"infini.sh/framework/core/util"
|
||||
elastic2 "infini.sh/framework/modules/elastic"
|
||||
"infini.sh/framework/modules/elastic/common"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
@ -491,6 +492,16 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt
|
|||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
host, _, err := net.SplitHostPort(nodeInfo.PublishAddress)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if !util.StringInArray(inst.IPS, host) {
|
||||
h.WriteError(w, fmt.Sprintf("got node host %s not match any ip of %v", host, inst.IPS), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
nodeInfo.ID = oldNodeInfo.ID
|
||||
nodeInfo.AgentID = inst.ID
|
||||
err = orm.Save(nil, nodeInfo)
|
||||
|
|
|
@ -74,11 +74,7 @@ func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Req
|
|||
tokens.Store(tokenStr, t)
|
||||
consoleEndpoint := agCfg.Setup.ConsoleEndpoint
|
||||
if consoleEndpoint == "" {
|
||||
scheme := "http"
|
||||
if req.TLS != nil {
|
||||
scheme = "https"
|
||||
}
|
||||
consoleEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host)
|
||||
consoleEndpoint = getDefaultConsoleEndpoint(req)
|
||||
}
|
||||
h.WriteJSON(w, util.MapStr{
|
||||
"script": fmt.Sprintf(`sudo BASE_URL="%s" AGENT_VER="%s" INSTALL_PATH="/opt" bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, agCfg.Setup.DownloadURL, agCfg.Setup.Version, consoleEndpoint, tokenStr),
|
||||
|
@ -87,6 +83,14 @@ func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Req
|
|||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
func getDefaultConsoleEndpoint(req *http.Request) string{
|
||||
scheme := "http"
|
||||
if req.TLS != nil {
|
||||
scheme = "https"
|
||||
}
|
||||
return fmt.Sprintf("%s://%s", scheme, req.Host)
|
||||
}
|
||||
|
||||
func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
tokenStr := h.GetParameter(req, "token")
|
||||
if strings.TrimSpace(tokenStr) == "" {
|
||||
|
@ -127,10 +131,14 @@ func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request,
|
|||
if port == "" {
|
||||
port = "8080"
|
||||
}
|
||||
consoleEndpoint := agCfg.Setup.ConsoleEndpoint
|
||||
if consoleEndpoint == "" {
|
||||
consoleEndpoint = getDefaultConsoleEndpoint(req)
|
||||
}
|
||||
_, err = tpl.Execute(w, map[string]interface{}{
|
||||
"base_url": agCfg.Setup.DownloadURL,
|
||||
"agent_version": agCfg.Setup.Version,
|
||||
"console_endpoint": agCfg.Setup.ConsoleEndpoint,
|
||||
"console_endpoint": consoleEndpoint,
|
||||
"client_crt": clientCertPEM,
|
||||
"client_key": clientKeyPEM,
|
||||
"ca_crt": caCert,
|
||||
|
|
|
@ -24,9 +24,6 @@ func GenerateServerCert(caFile, caKey string) (caCert, serverCertPEM, serverKeyP
|
|||
|
||||
func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM, instanceKeyPEM []byte, err error){
|
||||
pool := x509.NewCertPool()
|
||||
if caFile == "" {
|
||||
caFile = path.Join(global.Env().GetConfigDir(), "certs", "ca.crt")
|
||||
}
|
||||
caCert, err = os.ReadFile(caFile)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -39,9 +36,6 @@ func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM,
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
if caKey == "" {
|
||||
caKey = path.Join(global.Env().GetConfigDir(), "certs", "ca.key")
|
||||
}
|
||||
var keyBytes []byte
|
||||
keyBytes, err = os.ReadFile(caKey)
|
||||
if err != nil {
|
||||
|
|
|
@ -5,16 +5,62 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/console/modules/agent/model"
|
||||
"infini.sh/framework/core/env"
|
||||
"infini.sh/framework/core/global"
|
||||
"infini.sh/framework/core/util"
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
|
||||
func GetAgentConfig() *model.AgentConfig {
|
||||
agentCfg := &model.AgentConfig{}
|
||||
agentCfg := &model.AgentConfig{
|
||||
Enabled: true,
|
||||
Setup: &model.SetupConfig{
|
||||
DownloadURL: "https://release.infinilabs.com/agent/stable",
|
||||
Version: "0.5.0-214",
|
||||
},
|
||||
}
|
||||
_, err := env.ParseConfig("agent", agentCfg )
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Debug("agent config not found: %v", err)
|
||||
}
|
||||
if agentCfg.Setup.CACertFile == "" && agentCfg.Setup.CAKeyFile == "" {
|
||||
agentCfg.Setup.CACertFile, agentCfg.Setup.CAKeyFile, err = GetOrInitDefaultCaCerts()
|
||||
if err != nil {
|
||||
log.Errorf("generate default ca certs error: %v", err)
|
||||
}
|
||||
}
|
||||
return agentCfg
|
||||
}
|
||||
|
||||
func GetOrInitDefaultCaCerts()(string, string, error){
|
||||
dataDir := global.Env().GetDataDir()
|
||||
caFile := path.Join(dataDir, "certs/ca.crt")
|
||||
caKey := path.Join(dataDir, "certs/ca.key")
|
||||
if !(util.FileExists(caFile) && util.FileExists(caKey) ) {
|
||||
err := os.MkdirAll(path.Join(dataDir, "certs"), 0775)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
log.Info("auto generating cert files")
|
||||
_, rootKey, rootCertPEM := util.GetRootCert()
|
||||
|
||||
caKeyPEM := pem.EncodeToMemory(&pem.Block{
|
||||
Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(rootKey),
|
||||
})
|
||||
_, err = util.FilePutContentWithByte(caKey, caKeyPEM)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
_, err = util.FilePutContentWithByte(caFile, rootCertPEM)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
}
|
||||
return caFile, caKey, nil
|
||||
}
|
|
@ -11,6 +11,7 @@ import (
|
|||
"infini.sh/framework/core/credential"
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/event"
|
||||
"infini.sh/framework/core/global"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/util"
|
||||
log "src/github.com/cihub/seelog"
|
||||
|
@ -385,11 +386,20 @@ func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) {
|
|||
endpoint string
|
||||
ok bool
|
||||
)
|
||||
emptyIngestClusterEndpoint := false
|
||||
if agCfg.Setup.IngestClusterEndpoint == nil {
|
||||
emptyIngestClusterEndpoint = true
|
||||
}
|
||||
if endpoint, ok = agCfg.Setup.IngestClusterEndpoint.(string);ok {
|
||||
if endpoint = strings.TrimSpace(endpoint); endpoint == "" {
|
||||
return "", nil, fmt.Errorf("config ingest_cluster_endpoint must not be empty")
|
||||
emptyIngestClusterEndpoint = true
|
||||
}
|
||||
}
|
||||
if emptyIngestClusterEndpoint {
|
||||
cfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID))
|
||||
endpoint = cfg.Endpoint
|
||||
}
|
||||
|
||||
var (
|
||||
basicAuth elastic.BasicAuth
|
||||
)
|
||||
|
@ -407,6 +417,9 @@ func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) {
|
|||
if basicAuth, ok = info.(elastic.BasicAuth); !ok {
|
||||
log.Debug("invalid credential: ", cred)
|
||||
}
|
||||
}else{
|
||||
cfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID))
|
||||
basicAuth = *cfg.BasicAuth
|
||||
}
|
||||
tpl := `elasticsearch:
|
||||
- name: default
|
||||
|
|
|
@ -461,15 +461,18 @@ func (p *processor) checkBulkPipelineTaskStatus(bulkTask *task.Task, cfg *migrat
|
|||
)
|
||||
successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs")
|
||||
|
||||
if !cfg.Target.SkipCountCheck && successDocs != totalDocs {
|
||||
return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons)
|
||||
}
|
||||
|
||||
// successDocs matched but has errors
|
||||
if bulkTask.Status == task.StatusError {
|
||||
return true, successDocs, nil
|
||||
if successDocs != totalDocs {
|
||||
// check count
|
||||
if !cfg.Target.SkipCountCheck {
|
||||
return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons)
|
||||
}
|
||||
// has errors
|
||||
if bulkTask.Status == task.StatusError {
|
||||
return true, successDocs, fmt.Errorf("bulk pipeline failed")
|
||||
}
|
||||
}
|
||||
|
||||
// successDocs matched, return ok
|
||||
return true, successDocs, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue