Merge pull request #29142 from taosdata/enh/TD-32951

telemetry refactor
This commit is contained in:
Shengliang Guan 2024-12-17 11:25:55 +08:00 committed by GitHub
commit 1b5f5cfd60
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 424 additions and 21 deletions

View File

@ -62,7 +62,7 @@ After modifying configuration file parameters, you need to restart the *taosd* s
|telemetryReporting | |Supported, effective immediately |Whether to upload telemetry, 0: do not upload, 1: upload, default value 1|
|telemetryServer | |Not supported |Telemetry server address|
|telemetryPort | |Not supported |Telemetry server port number|
|telemetryInterval | |Supported, effective immediately |Telemetry upload interval, in seconds, default 43200|
|telemetryInterval | |Supported, effective immediately |Telemetry upload interval, in seconds, default 86400|
|crashReporting | |Supported, effective immediately |Whether to upload crash information; 0: do not upload, 1: upload; default value 1|
### Query Related

View File

@ -59,7 +59,7 @@ taosd 命令行参数如下
|telemetryReporting | |支持动态修改 立即生效 |是否上传 telemetry0不上传1上传默认值 1|
|telemetryServer | |不支持动态修改 |telemetry 服务器地址|
|telemetryPort | |不支持动态修改 |telemetry 服务器端口编号|
|telemetryInterval | |支持动态修改 立即生效 |telemetry 上传时间间隔,单位为秒,默认 43200|
|telemetryInterval | |支持动态修改 立即生效 |telemetry 上传时间间隔,单位为秒,默认 86400|
|crashReporting | |支持动态修改 立即生效 |是否上传 crash 信息0不上传1上传默认值 1|
### 查询相关

View File

@ -17,6 +17,7 @@
#define _TD_UTIL_HTTP_H_
#include "os.h"
#include "tdef.h"
#include "tref.h"
#ifdef __cplusplus
@ -35,6 +36,21 @@ int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t p
EHttpCompFlag flag, int64_t chanId, const char* qid);
void taosDestroyHttpChan(int64_t chanId);
int32_t taosSendRecvHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, const char* qid, int64_t recvBufId);
typedef struct {
char defaultAddr[256 + 1];
char cachedAddr[256 + 1];
int64_t recvBufRid;
} STelemAddrMgmt;
int32_t taosTelemetryMgtInit(STelemAddrMgmt* mgt, char* defaultAddr);
void taosTelemetryDestroy(STelemAddrMgmt* mgt);
// not safe for multi-thread, should be called in the same thread
int32_t taosSendTelemReport(STelemAddrMgmt* mgt, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag);
#ifdef __cplusplus
}
#endif

View File

@ -172,7 +172,8 @@ static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
if (pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
if (pRequest->sqlstr != NULL &&
strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
@ -802,6 +803,7 @@ void stopAllQueries(SRequestObj *pRequest) {
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
static void *tscCrashReportThreadFp(void *param) {
int32_t code = 0;
setThreadName("client-crashReport");
char filepath[PATH_MAX] = {0};
(void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
@ -822,6 +824,12 @@ static void *tscCrashReportThreadFp(void *param) {
if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
return NULL;
}
STelemAddrMgmt mgt;
code = taosTelemetryMgtInit(&mgt, tsTelemServer);
if (code) {
tscError("failed to init telemetry management, code:%s", tstrerror(code));
return NULL;
}
while (1) {
if (clientStop > 0) break;
@ -832,7 +840,7 @@ static void *tscCrashReportThreadFp(void *param) {
taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
if (pMsg && msgLen > 0) {
if (taosSendHttpReport(tsTelemServer, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
tscError("failed to send crash report");
if (pFile) {
taosReleaseCrashLogFile(pFile, false);
@ -866,6 +874,7 @@ static void *tscCrashReportThreadFp(void *param) {
taosMsleep(sleepTime);
loopTimes = 0;
}
taosTelemetryDestroy(&mgt);
clientStop = -2;
return NULL;
@ -1107,7 +1116,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
*/
uint64_t generateRequestId() {
static uint32_t hashId = 0;
static int32_t requestSerialId = 0;
static int32_t requestSerialId = 0;
if (hashId == 0) {
int32_t code = taosGetSystemUUIDU32(&hashId);

View File

@ -155,7 +155,7 @@ bool tsEnableTelem = false;
#else
bool tsEnableTelem = true;
#endif
int32_t tsTelemInterval = 43200;
int32_t tsTelemInterval = 86400;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com";
uint16_t tsTelemPort = 80;
char *tsTelemUri = "/report";

View File

@ -254,6 +254,7 @@ static void *dmAuditThreadFp(void *param) {
}
static void *dmCrashReportThreadFp(void *param) {
int32_t code = 0;
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-crashReport");
@ -265,8 +266,14 @@ static void *dmCrashReportThreadFp(void *param) {
bool truncateFile = false;
int32_t sleepTime = 200;
int32_t reportPeriodNum = 3600 * 1000 / sleepTime;
;
int32_t loopTimes = reportPeriodNum;
int32_t loopTimes = reportPeriodNum;
STelemAddrMgmt mgt = {0};
code = taosTelemetryMgtInit(&mgt, tsTelemServer);
if (code != 0) {
dError("failed to init telemetry since %s", tstrerror(code));
return NULL;
}
while (1) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
@ -277,7 +284,7 @@ static void *dmCrashReportThreadFp(void *param) {
taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
if (pMsg && msgLen > 0) {
if (taosSendHttpReport(tsTelemServer, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
dError("failed to send crash report");
if (pFile) {
taosReleaseCrashLogFile(pFile, false);
@ -311,6 +318,7 @@ static void *dmCrashReportThreadFp(void *param) {
taosMsleep(sleepTime);
loopTimes = 0;
}
taosTelemetryDestroy(&mgt);
return NULL;
}

View File

@ -24,6 +24,7 @@
#include "tdatablock.h"
#include "tglobal.h"
#include "tgrant.h"
#include "thttp.h"
#include "tqueue.h"
#include "ttime.h"
#include "version.h"
@ -80,8 +81,9 @@ typedef struct {
} SProfileMgmt;
typedef struct {
TdThreadMutex lock;
char email[TSDB_FQDN_LEN];
TdThreadMutex lock;
char email[TSDB_FQDN_LEN];
STelemAddrMgmt addrMgt;
} STelemMgmt;
typedef struct {

View File

@ -411,7 +411,7 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
mndStreamConsensusChkpt(pMnode);
}
if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
if (sec % tsTelemInterval == (TMIN(86400, (tsTelemInterval - 1)))) {
mndPullupTelem(pMnode);
}

View File

@ -132,6 +132,8 @@ _OVER:
}
static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
int32_t code = 0;
int32_t line = 0;
SMnode* pMnode = pReq->info.node;
STelemMgmt* pMgmt = &pMnode->telemMgmt;
if (!tsEnableTelem) return 0;
@ -140,15 +142,18 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
char* pCont = mndBuildTelemetryReport(pMnode);
(void)taosThreadMutexUnlock(&pMgmt->lock);
if (pCont != NULL) {
if (taosSendHttpReport(tsTelemServer, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) {
mError("failed to send telemetry report");
} else {
mInfo("succeed to send telemetry report");
}
taosMemoryFree(pCont);
if (pCont == NULL) {
return 0;
}
return 0;
code = taosSendTelemReport(&pMgmt->addrMgt, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT);
taosMemoryFree(pCont);
return code;
_end:
if (code != 0) {
mError("%s failed to send at line %d since %s", __func__, line, tstrerror(code));
}
taosMemoryFree(pCont);
return code;
}
int32_t mndInitTelem(SMnode* pMnode) {
@ -158,6 +163,11 @@ int32_t mndInitTelem(SMnode* pMnode) {
(void)taosThreadMutexInit(&pMgmt->lock, NULL);
if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0)
mWarn("failed to get email since %s", tstrerror(code));
code = taosTelemetryMgtInit(&pMgmt->addrMgt, tsTelemServer);
if (code != 0) {
mError("failed to init telemetry management since %s", tstrerror(code));
return code;
}
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
return 0;
@ -165,5 +175,6 @@ int32_t mndInitTelem(SMnode* pMnode) {
void mndCleanupTelem(SMnode* pMnode) {
STelemMgmt* pMgmt = &pMnode->telemMgmt;
taosTelemetryDestroy(&pMgmt->addrMgt);
(void)taosThreadMutexDestroy(&pMgmt->lock);
}

View File

@ -31,6 +31,7 @@ static int32_t FAST_FAILURE_LIMIT = 1;
static int64_t httpDefaultChanId = -1;
static int64_t httpSeqNum = 0;
static int32_t httpRecvRefMgt = 0;
typedef struct SHttpModule {
uv_loop_t* loop;
@ -54,6 +55,8 @@ typedef struct SHttpMsg {
int64_t chanId;
int64_t seq;
char* qid;
int64_t recvBufRid;
} SHttpMsg;
typedef struct SHttpClient {
@ -67,6 +70,8 @@ typedef struct SHttpClient {
struct sockaddr_in dest;
int64_t chanId;
int64_t seq;
int64_t recvBufRid;
} SHttpClient;
typedef struct SHttpConnList {
@ -74,6 +79,13 @@ typedef struct SHttpConnList {
} SHttpConnList;
typedef struct {
char* pBuf;
int32_t nBuf;
SRWLatch latch;
int8_t inited;
} SHttpRecvBuf;
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
static void transHttpEnvInit();
@ -94,6 +106,9 @@ static void httpModuleDestroy(SHttpModule* http);
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid);
static int32_t taosSendHttpReportImplByChan2(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid,
int64_t rid);
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, const char* qid, char* pHead,
int32_t headLen,
@ -394,6 +409,8 @@ static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
taosMemoryFree(cli->wbuf);
taosMemoryFree(cli->rbuf);
taosMemoryFree(cli->addr);
// taosFreeHttpRecvHandle(cli->recvBufRid);
taosMemoryFree(cli);
}
@ -423,6 +440,28 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const
tError("http-report recv error:%s, seq:%" PRId64 "", uv_strerror(nread), cli->seq);
} else {
tTrace("http-report succ to recv %d bytes, seq:%" PRId64 "", (int32_t)nread, cli->seq);
if (cli->recvBufRid > 0) {
SHttpRecvBuf* p = taosAcquireRef(httpRecvRefMgt, cli->recvBufRid);
if (p != NULL) {
taosWLockLatch(&p->latch);
if (p->inited != 0) {
tDebug("http-report already recv valid data");
} else {
p->pBuf = taosMemoryCalloc(1, nread + 1);
if (p->pBuf != NULL) {
memcpy(p->pBuf, buf->base, nread);
p->nBuf = nread;
p->inited = 1;
} else {
tError("http-report failed to dump recv since %s", tstrerror(terrno));
}
}
taosWUnLockLatch(&p->latch);
TAOS_UNUSED(taosReleaseRef(httpRecvRefMgt, cli->recvBufRid));
} else {
tWarn("http-report failed to acquire recv buf since %s", tstrerror(terrno));
}
}
}
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
@ -629,6 +668,8 @@ static void httpHandleReq(SHttpMsg* msg) {
cli->chanId = chanId;
cli->addr = msg->server;
cli->port = msg->port;
cli->recvBufRid = msg->recvBufRid;
if (msg->qid != NULL) taosMemoryFree(msg->qid);
taosMemoryFree(msg->uri);
taosMemoryFree(msg);
@ -714,6 +755,47 @@ void httpModuleDestroy2(SHttpModule* http) {
taosMemoryFree(http);
}
static int32_t taosSendHttpReportImplByChan2(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid,
int64_t recvBufRid) {
SHttpModule* load = NULL;
SHttpMsg* msg = NULL;
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, qid, &msg);
if (code != 0) {
goto _ERROR;
}
msg->recvBufRid = recvBufRid;
load = taosAcquireRef(httpRefMgt, chanId);
if (load == NULL) {
code = terrno;
goto _ERROR;
}
if (atomic_load_8(&load->quit)) {
code = TSDB_CODE_HTTP_MODULE_QUIT;
goto _ERROR;
}
tDebug("http-report start to report, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, msg->seq);
code = transAsyncSend(load->asyncPool, &(msg->q));
if (code != 0) {
code = TSDB_CODE_HTTP_MODULE_QUIT;
goto _ERROR;
}
msg = NULL;
_ERROR:
if (code != 0) {
tError("http-report failed to report reason:%s, chanId:%" PRId64 ", seq:%" PRId64 "", tstrerror(code), chanId,
msg->seq);
}
httpDestroyMsg(msg);
if (load != NULL) taosReleaseRef(httpRefMgt, chanId);
return code;
}
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid) {
SHttpModule* load = NULL;
@ -769,20 +851,34 @@ int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid);
}
int32_t taosSendRecvHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, const char* qid, int64_t recvBufId) {
TAOS_UNUSED(taosThreadOnce(&transHttpInit, transHttpEnvInit));
return taosSendHttpReportImplByChan2(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid, recvBufId);
}
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
static void transHttpDestroyRecvHandle(void* handle) {
SHttpRecvBuf* p = handle;
taosMemoryFree(p->pBuf);
taosMemoryFree(p);
}
int64_t transInitHttpChanImpl();
static void transHttpEnvInit() {
httpRefMgt = taosOpenRef(64, transHttpDestroyHandle);
httpDefaultChanId = transInitHttpChanImpl();
httpSeqNum = 0;
httpRecvRefMgt = taosOpenRef(8, transHttpDestroyRecvHandle);
}
void transHttpEnvDestroy() {
// remove default chanId
taosDestroyHttpChan(httpDefaultChanId);
httpDefaultChanId = -1;
taosCloseRef(httpRecvRefMgt);
}
int64_t transInitHttpChanImpl() {
@ -868,3 +964,158 @@ void taosDestroyHttpChan(int64_t chanId) {
TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
TAOS_UNUSED(taosRemoveRef(httpRefMgt, chanId));
}
static int32_t taosAllocHttpRecvHandle(int64_t* rid) {
TAOS_UNUSED(taosThreadOnce(&transHttpInit, transHttpEnvInit));
SHttpRecvBuf* p = taosMemoryCalloc(1, sizeof(SHttpRecvBuf));
if (p == NULL) {
return terrno;
}
taosInitRWLatch(&p->latch);
int64_t id = taosAddRef(httpRecvRefMgt, p);
if (taosAcquireRef(httpRecvRefMgt, id) == NULL) {
taosMemoryFree(p);
return terrno;
}
*rid = id;
return 0;
}
static void taosFreeHttpRecvHandle(int64_t rid) {
if (rid <= 0) {
return;
}
TAOS_UNUSED(taosReleaseRef(httpRecvRefMgt, rid));
TAOS_UNUSED(taosRemoveRef(httpRecvRefMgt, rid));
}
static int32_t taosGetHttpRecvById(int64_t rid, char** pRecv, int32_t* len) {
int32_t code = 0;
SHttpRecvBuf* p = taosAcquireRef(httpRecvRefMgt, rid);
if (p == NULL) {
return TSDB_CODE_INVALID_PARA;
}
taosWLockLatch(&p->latch);
if (p->inited) {
*pRecv = p->pBuf;
*len = p->nBuf;
p->pBuf = 0;
p->nBuf = 0;
} else {
code = TSDB_CODE_INVALID_PARA;
}
taosWUnLockLatch(&p->latch);
TAOS_UNUSED(taosReleaseRef(httpRecvRefMgt, rid));
return code;
}
int32_t taosTelemetryMgtInit(STelemAddrMgmt* mgt, char* defaultAddr) {
tstrncpy(mgt->defaultAddr, defaultAddr, sizeof(mgt->defaultAddr));
mgt->cachedAddr[0] = 0;
mgt->recvBufRid = 0;
return 0;
}
void taosTelemetryDestroy(STelemAddrMgmt* mgt) {
if (mgt == NULL || mgt->recvBufRid <= 0) {
return;
}
taosFreeHttpRecvHandle(mgt->recvBufRid);
mgt->recvBufRid = 0;
}
// TODO: parse http response head By LIB
static int32_t taosTelemetryGetValueFromHttpResp(const char* response, const char* key, char* val) {
if (key == NULL || val == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0, line = 0;
int32_t len = strlen(key);
int32_t cap = len + 16;
// const char* reportUrlKey = "\"report_url\":\"";
char* buf = taosMemoryCalloc(1, cap);
if (buf == NULL) {
return terrno;
}
int32_t nBytes = snprintf(buf, cap, "\"%s\":\"", key);
if ((uint32_t)nBytes >= cap) {
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end);
}
char* start = strstr(response, buf);
if (start) {
start += strlen(buf);
char* end = strchr(start, '"');
if (end && end - start > 0) {
tstrncpy(val, start, end - start + 1);
} else {
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end);
}
} else {
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end);
}
_end:
if (code != 0) {
tError("failed to get value from http response since %s", tstrerror(code));
}
taosMemoryFree(buf);
return code;
}
static int32_t taosGetAddrFromHttpResp(int64_t recvBufRid, char* telemAddr) {
int32_t code = 0;
char* pRecv = NULL;
int32_t nRecv = 0;
code = taosGetHttpRecvById(recvBufRid, &pRecv, &nRecv);
if (code != 0) {
tError("failed to get http recv buf since %s", tstrerror(code));
return code;
}
code = taosTelemetryGetValueFromHttpResp(pRecv, "report_url", telemAddr);
taosMemoryFree(pRecv);
return code;
}
int32_t taosSendTelemReport(STelemAddrMgmt* mgt, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag) {
int32_t code = 0;
int32_t line = 0;
int32_t sendMsg = 0;
char* addr = mgt->defaultAddr;
if (mgt->cachedAddr[0] == 0) {
if (mgt->recvBufRid == 0) {
code = taosAllocHttpRecvHandle(&mgt->recvBufRid);
TAOS_CHECK_GOTO(code, &line, _end);
code = taosSendRecvHttpReportWithQID(mgt->defaultAddr, uri, port, pCont, contLen, flag, NULL, mgt->recvBufRid);
TAOS_CHECK_GOTO(code, &line, _end);
} else {
code = taosGetAddrFromHttpResp(mgt->recvBufRid, mgt->cachedAddr);
if (code != 0) {
tError("failed to get cache addr from http response since %s", tstrerror(code));
addr = mgt->defaultAddr;
} else {
addr = mgt->cachedAddr;
}
if (mgt->recvBufRid > 0) {
taosFreeHttpRecvHandle(mgt->recvBufRid);
mgt->recvBufRid = 0;
}
sendMsg = 1;
}
} else {
addr = mgt->cachedAddr;
sendMsg = 1;
}
if (sendMsg == 1) {
code = taosSendHttpReport(addr, uri, port, pCont, contLen, flag);
}
tDebug("send telemetry port oldAddr:[%s], newAddr:[%s]", mgt->defaultAddr, mgt->cachedAddr);
return code;
_end:
if (code != 0) {
tError("failed to send telemetry since %s, default addr:%s, cachedAddr:%s", tstrerror(code), mgt->defaultAddr,
mgt->cachedAddr);
}
return code;
}

View File

@ -22,6 +22,8 @@
#include "transLog.h"
#include "trpc.h"
#include "tversion.h"
#include "thttp.h"
#include "tjson.h"
using namespace std;
const char *label = "APP";
@ -515,7 +517,7 @@ TEST_F(TransEnv, idTest) {
TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 500000; i++) {
for (int i = 0; i < 50000; i++) {
memset(&req, 0, sizeof(req));
req.info.noResp = 0;
req.msgType = 3;
@ -527,3 +529,107 @@ TEST_F(TransEnv, noResp) {
taosMsleep(10000);
// no resp
}
TEST_F(TransEnv, http) {
int32_t code = 0;
char tmp[4096] = {0};
int32_t lino = 0;
SJson* pJson = tjsonCreateObject();
char clusterName[64] = {0};
tjsonAddStringToObject(pJson, "instanceId", clusterName);
tjsonAddDoubleToObject(pJson, "reportVersion", 1);
if (taosGetOsReleaseName(tmp, NULL, NULL, sizeof(tmp)) == 0) {
tjsonAddStringToObject(pJson, "os", tmp);
}
char *pCont = NULL;
int32_t len = 0;
float numOfCores = 0;
if (taosGetCpuInfo(tmp, sizeof(tmp), &numOfCores) == 0) {
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "cpuModel", tmp), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores), &lino, _OVER);
} else {
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores), &lino, _OVER);
}
snprintf(tmp, sizeof(tmp), "%" PRId64 " kB", tsTotalMemoryKB);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "memory", tmp), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "version", td_version), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "buildInfo", td_buildinfo), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "gitInfo", td_gitinfo), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "email", "test126.com"), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfDnode",1 ), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfMnode", 1), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfVgroup", 1), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfDatabase", 1), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfSuperTable", 1), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfChildTable", 1), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfColumn", 1), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfPoint", 1), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "totalStorage", 1), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "compStorage", 1), &lino, _OVER);
pCont = tjsonToString(pJson);
len = strlen(pCont);
tjsonDelete(pJson);
{
#if 0
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, "telemetry.tdengine.com");
int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
taosMsleep(2000);
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr);
for (int32_t i = 0; i < 10; i++) {
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr);
taosMsleep(2000);
}
taosTelemetryDestroy(&mgt);
#endif
}
{
#if 0
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, "telemetry.taosdata.com");
int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT);
printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr);
taosMsleep(2000);
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
for (int32_t i = 0; i < 10; i++) {
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr);
taosMsleep(2000);
}
taosTelemetryDestroy(&mgt);
#endif
}
{
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, "error");
int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT);
taosMsleep(2000);
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT);
for (int32_t i = 0; i < 10; i++) {
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT);
taosMsleep(2000);
}
taosTelemetryDestroy(&mgt);
}
_OVER:
return;
}