adjust telemetry code
This commit is contained in:
parent
12d1bf03ac
commit
28dd4f7eaf
|
@ -52,7 +52,7 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
SClusterObj *pCluster = sdbAcquire(pSdb, SDB_CLUSTER, &pMnode->clusterId);
|
SClusterObj *pCluster = sdbAcquire(pSdb, SDB_CLUSTER, &pMnode->clusterId);
|
||||||
if (pCluster = NULL) {
|
if (pCluster == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,9 +20,10 @@
|
||||||
#include "tbuffer.h"
|
#include "tbuffer.h"
|
||||||
#include "tversion.h"
|
#include "tversion.h"
|
||||||
|
|
||||||
#define TELEMETRY_SERVER "telemetry.taosdata.com"
|
// #define TELEMETRY_SERVER "telemetry.taosdata.com"
|
||||||
#define TELEMETRY_PORT 80
|
#define TELEMETRY_SERVER "localhost"
|
||||||
#define REPORT_INTERVAL 86400
|
#define TELEMETRY_PORT 80
|
||||||
|
#define REPORT_INTERVAL 86400
|
||||||
|
|
||||||
static void mndBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); }
|
static void mndBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); }
|
||||||
|
|
||||||
|
@ -33,25 +34,8 @@ static void mndCloseObject(SBufferWriter* bw) {
|
||||||
} else {
|
} else {
|
||||||
tbufWriteChar(bw, '}');
|
tbufWriteChar(bw, '}');
|
||||||
}
|
}
|
||||||
tbufWriteChar(bw, ',');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
static void beginArray(SBufferWriter* bw) {
|
|
||||||
tbufWriteChar(bw, '[');
|
|
||||||
}
|
|
||||||
|
|
||||||
static void closeArray(SBufferWriter* bw) {
|
|
||||||
size_t len = tbufTell(bw);
|
|
||||||
if (tbufGetData(bw, false)[len - 1] == ',') {
|
|
||||||
tbufWriteCharAt(bw, len - 1, ']');
|
|
||||||
} else {
|
|
||||||
tbufWriteChar(bw, ']');
|
|
||||||
}
|
|
||||||
tbufWriteChar(bw, ',');
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
static void mndWriteString(SBufferWriter* bw, const char* str) {
|
static void mndWriteString(SBufferWriter* bw, const char* str) {
|
||||||
tbufWriteChar(bw, '"');
|
tbufWriteChar(bw, '"');
|
||||||
tbufWrite(bw, str, strlen(str));
|
tbufWrite(bw, str, strlen(str));
|
||||||
|
@ -61,7 +45,7 @@ static void mndWriteString(SBufferWriter* bw, const char* str) {
|
||||||
static void mndAddIntField(SBufferWriter* bw, const char* k, int64_t v) {
|
static void mndAddIntField(SBufferWriter* bw, const char* k, int64_t v) {
|
||||||
mndWriteString(bw, k);
|
mndWriteString(bw, k);
|
||||||
tbufWriteChar(bw, ':');
|
tbufWriteChar(bw, ':');
|
||||||
char buf[32];
|
char buf[32] = {0};
|
||||||
sprintf(buf, "%" PRId64, v);
|
sprintf(buf, "%" PRId64, v);
|
||||||
tbufWrite(bw, buf, strlen(buf));
|
tbufWrite(bw, buf, strlen(buf));
|
||||||
tbufWriteChar(bw, ',');
|
tbufWriteChar(bw, ',');
|
||||||
|
@ -184,24 +168,17 @@ static void mndAddRuntimeInfo(SMnode* pMnode, SBufferWriter* bw) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndSendTelemetryReport(SMnode* pMnode) {
|
static void mndSendTelemetryReport(SMnode* pMnode) {
|
||||||
STelemMgmt* pMgmt = &pMnode->telemMgmt;
|
STelemMgmt* pMgmt = &pMnode->telemMgmt;
|
||||||
|
SBufferWriter bw = tbufInitWriter(NULL, false);
|
||||||
char buf[128] = {0};
|
int32_t code = -1;
|
||||||
uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
|
char buf[128] = {0};
|
||||||
if (ip == 0xffffffff) {
|
SOCKET fd = 0;
|
||||||
mDebug("failed to get IP address of " TELEMETRY_SERVER " since :%s", strerror(errno));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0);
|
|
||||||
if (fd < 0) {
|
|
||||||
mDebug("failed to create socket for telemetry, reason:%s", strerror(errno));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
char clusterName[64] = {0};
|
char clusterName[64] = {0};
|
||||||
mndGetClusterName(pMnode, clusterName, sizeof(clusterName));
|
if (mndGetClusterName(pMnode, clusterName, sizeof(clusterName)) != 0) {
|
||||||
|
goto SEND_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
SBufferWriter bw = tbufInitWriter(NULL, false);
|
|
||||||
mndBeginObject(&bw);
|
mndBeginObject(&bw);
|
||||||
mndAddStringField(&bw, "instanceId", clusterName);
|
mndAddStringField(&bw, "instanceId", clusterName);
|
||||||
mndAddIntField(&bw, "reportVersion", 1);
|
mndAddIntField(&bw, "reportVersion", 1);
|
||||||
|
@ -212,32 +189,64 @@ static void mndSendTelemetryReport(SMnode* pMnode) {
|
||||||
mndAddRuntimeInfo(pMnode, &bw);
|
mndAddRuntimeInfo(pMnode, &bw);
|
||||||
mndCloseObject(&bw);
|
mndCloseObject(&bw);
|
||||||
|
|
||||||
|
uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
|
||||||
|
if (ip == 0xffffffff) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to get ip of %s since :%s", TELEMETRY_SERVER, terrstr());
|
||||||
|
goto SEND_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0);
|
||||||
|
if (fd < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to create socket to %s:%d since:%s", TELEMETRY_SERVER, TELEMETRY_PORT, terrstr());
|
||||||
|
goto SEND_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
const char* header =
|
const char* header =
|
||||||
"POST /report HTTP/1.1\n"
|
"POST /report HTTP/1.1\n"
|
||||||
"Host: " TELEMETRY_SERVER
|
"Host: " TELEMETRY_SERVER
|
||||||
"\n"
|
"\n"
|
||||||
"Content-Type: application/json\n"
|
"Content-Type: application/json\n"
|
||||||
"Content-Length: ";
|
"Content-Length: ";
|
||||||
|
if (taosWriteSocket(fd, (void*)header, (int32_t)strlen(header)) < 0) {
|
||||||
|
goto SEND_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
taosWriteSocket(fd, (void*)header, (int32_t)strlen(header));
|
int32_t contLen = (int32_t)(tbufTell(&bw));
|
||||||
int32_t contLen = (int32_t)(tbufTell(&bw) - 1);
|
|
||||||
sprintf(buf, "%d\n\n", contLen);
|
sprintf(buf, "%d\n\n", contLen);
|
||||||
taosWriteSocket(fd, buf, (int32_t)strlen(buf));
|
if (taosWriteSocket(fd, buf, (int32_t)strlen(buf)) < 0) {
|
||||||
taosWriteSocket(fd, tbufGetData(&bw, false), contLen);
|
goto SEND_OVER;
|
||||||
tbufCloseWriter(&bw);
|
}
|
||||||
|
|
||||||
|
const char* pCont = tbufGetData(&bw, false);
|
||||||
|
if (taosWriteSocket(fd, (void*)pCont, contLen) < 0) {
|
||||||
|
goto SEND_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
// read something to avoid nginx error 499
|
// read something to avoid nginx error 499
|
||||||
if (taosReadSocket(fd, buf, 10) < 0) {
|
if (taosReadSocket(fd, buf, 10) < 0) {
|
||||||
mDebug("failed to receive response since %s", strerror(errno));
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to receive response since %s", terrstr());
|
||||||
|
goto SEND_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mInfo("send telemetry to %s:%d, len:%d content: %s", TELEMETRY_SERVER, TELEMETRY_PORT, contLen, pCont);
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
SEND_OVER:
|
||||||
|
tbufCloseWriter(&bw);
|
||||||
taosCloseSocket(fd);
|
taosCloseSocket(fd);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
mError("failed to send telemetry to %s:%d since %s", TELEMETRY_SERVER, TELEMETRY_PORT, terrstr());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessTelemTimer(SMnodeMsg* pReq) {
|
static int32_t mndProcessTelemTimer(SMnodeMsg* pReq) {
|
||||||
SMnode* pMnode = pReq->pMnode;
|
SMnode* pMnode = pReq->pMnode;
|
||||||
STelemMgmt* pMgmt = &pMnode->telemMgmt;
|
STelemMgmt* pMgmt = &pMnode->telemMgmt;
|
||||||
if (!pMgmt->enable) return 0;
|
// if (!pMgmt->enable) return 0;
|
||||||
|
|
||||||
taosWLockLatch(&pMgmt->lock);
|
taosWLockLatch(&pMgmt->lock);
|
||||||
mndSendTelemetryReport(pMnode);
|
mndSendTelemetryReport(pMnode);
|
||||||
|
|
|
@ -144,6 +144,8 @@ static void mndCleanupTimer(SMnode *pMnode) {
|
||||||
pMnode->transTimer = NULL;
|
pMnode->transTimer = NULL;
|
||||||
taosTmrStop(pMnode->mqTimer);
|
taosTmrStop(pMnode->mqTimer);
|
||||||
pMnode->mqTimer = NULL;
|
pMnode->mqTimer = NULL;
|
||||||
|
taosTmrStop(pMnode->telemTimer);
|
||||||
|
pMnode->telemTimer = NULL;
|
||||||
taosTmrCleanUp(pMnode->timer);
|
taosTmrCleanUp(pMnode->timer);
|
||||||
pMnode->timer = NULL;
|
pMnode->timer = NULL;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue