diff --git a/docs/en/14-reference/01-components/01-taosd.md b/docs/en/14-reference/01-components/01-taosd.md index 81bfd2ecad..1b7f63510b 100644 --- a/docs/en/14-reference/01-components/01-taosd.md +++ b/docs/en/14-reference/01-components/01-taosd.md @@ -62,7 +62,7 @@ After modifying configuration file parameters, you need to restart the *taosd* s |telemetryReporting | |Supported, effective immediately |Whether to upload telemetry, 0: do not upload, 1: upload, default value 1| |telemetryServer | |Not supported |Telemetry server address| |telemetryPort | |Not supported |Telemetry server port number| -|telemetryInterval | |Supported, effective immediately |Telemetry upload interval, in seconds, default 43200| +|telemetryInterval | |Supported, effective immediately |Telemetry upload interval, in seconds, default 86400| |crashReporting | |Supported, effective immediately |Whether to upload crash information; 0: do not upload, 1: upload; default value 1| ### Query Related diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index da86fadf2b..7a0a306a41 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -59,7 +59,7 @@ taosd 命令行参数如下 |telemetryReporting | |支持动态修改 立即生效 |是否上传 telemetry,0:不上传,1:上传,默认值 1| |telemetryServer | |不支持动态修改 |telemetry 服务器地址| |telemetryPort | |不支持动态修改 |telemetry 服务器端口编号| -|telemetryInterval | |支持动态修改 立即生效 |telemetry 上传时间间隔,单位为秒,默认 43200| +|telemetryInterval | |支持动态修改 立即生效 |telemetry 上传时间间隔,单位为秒,默认 86400| |crashReporting | |支持动态修改 立即生效 |是否上传 crash 信息;0:不上传,1:上传;默认值 1| ### 查询相关 diff --git a/include/libs/transport/thttp.h b/include/libs/transport/thttp.h index 9f635b8523..f81895c416 100644 --- a/include/libs/transport/thttp.h +++ b/include/libs/transport/thttp.h @@ -17,6 +17,7 @@ #define _TD_UTIL_HTTP_H_ #include "os.h" +#include "tdef.h" #include "tref.h" #ifdef __cplusplus @@ -35,6 +36,21 @@ int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t p EHttpCompFlag flag, int64_t chanId, const char* qid); void taosDestroyHttpChan(int64_t chanId); +int32_t taosSendRecvHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, const char* qid, int64_t recvBufId); + +typedef struct { + char defaultAddr[256 + 1]; + char cachedAddr[256 + 1]; + int64_t recvBufRid; +} STelemAddrMgmt; +int32_t taosTelemetryMgtInit(STelemAddrMgmt* mgt, char* defaultAddr); +void taosTelemetryDestroy(STelemAddrMgmt* mgt); + +// not safe for multi-thread, should be called in the same thread +int32_t taosSendTelemReport(STelemAddrMgmt* mgt, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index c58d3bd4df..df93920303 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -172,7 +172,8 @@ static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType))); ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject( json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows))); - if (pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) { + if (pRequest->sqlstr != NULL && + strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) { char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen]; pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0'; ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr))); @@ -802,6 +803,7 @@ void stopAllQueries(SRequestObj *pRequest) { void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } static void *tscCrashReportThreadFp(void *param) { + int32_t code = 0; setThreadName("client-crashReport"); char filepath[PATH_MAX] = {0}; (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP); @@ -822,6 +824,12 @@ static void *tscCrashReportThreadFp(void *param) { if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) { return NULL; } + STelemAddrMgmt mgt; + code = taosTelemetryMgtInit(&mgt, tsTelemServer); + if (code) { + tscError("failed to init telemetry management, code:%s", tstrerror(code)); + return NULL; + } while (1) { if (clientStop > 0) break; @@ -832,7 +840,7 @@ static void *tscCrashReportThreadFp(void *param) { taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile); if (pMsg && msgLen > 0) { - if (taosSendHttpReport(tsTelemServer, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) { + if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) { tscError("failed to send crash report"); if (pFile) { taosReleaseCrashLogFile(pFile, false); @@ -866,6 +874,7 @@ static void *tscCrashReportThreadFp(void *param) { taosMsleep(sleepTime); loopTimes = 0; } + taosTelemetryDestroy(&mgt); clientStop = -2; return NULL; @@ -1107,7 +1116,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { */ uint64_t generateRequestId() { static uint32_t hashId = 0; - static int32_t requestSerialId = 0; + static int32_t requestSerialId = 0; if (hashId == 0) { int32_t code = taosGetSystemUUIDU32(&hashId); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 59ed0e386d..905dcb4fda 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -155,7 +155,7 @@ bool tsEnableTelem = false; #else bool tsEnableTelem = true; #endif -int32_t tsTelemInterval = 43200; +int32_t tsTelemInterval = 86400; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com"; uint16_t tsTelemPort = 80; char *tsTelemUri = "/report"; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index e55a78075a..8f890f6805 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -254,6 +254,7 @@ static void *dmAuditThreadFp(void *param) { } static void *dmCrashReportThreadFp(void *param) { + int32_t code = 0; SDnodeMgmt *pMgmt = param; int64_t lastTime = taosGetTimestampMs(); setThreadName("dnode-crashReport"); @@ -265,8 +266,14 @@ static void *dmCrashReportThreadFp(void *param) { bool truncateFile = false; int32_t sleepTime = 200; int32_t reportPeriodNum = 3600 * 1000 / sleepTime; - ; - int32_t loopTimes = reportPeriodNum; + int32_t loopTimes = reportPeriodNum; + + STelemAddrMgmt mgt = {0}; + code = taosTelemetryMgtInit(&mgt, tsTelemServer); + if (code != 0) { + dError("failed to init telemetry since %s", tstrerror(code)); + return NULL; + } while (1) { if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; @@ -277,7 +284,7 @@ static void *dmCrashReportThreadFp(void *param) { taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile); if (pMsg && msgLen > 0) { - if (taosSendHttpReport(tsTelemServer, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) { + if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) { dError("failed to send crash report"); if (pFile) { taosReleaseCrashLogFile(pFile, false); @@ -311,6 +318,7 @@ static void *dmCrashReportThreadFp(void *param) { taosMsleep(sleepTime); loopTimes = 0; } + taosTelemetryDestroy(&mgt); return NULL; } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index fff88e547c..982a541bb3 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -24,6 +24,7 @@ #include "tdatablock.h" #include "tglobal.h" #include "tgrant.h" +#include "thttp.h" #include "tqueue.h" #include "ttime.h" #include "version.h" @@ -80,8 +81,9 @@ typedef struct { } SProfileMgmt; typedef struct { - TdThreadMutex lock; - char email[TSDB_FQDN_LEN]; + TdThreadMutex lock; + char email[TSDB_FQDN_LEN]; + STelemAddrMgmt addrMgt; } STelemMgmt; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 2621f3761f..aa3ffc58e8 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -411,7 +411,7 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { mndStreamConsensusChkpt(pMnode); } - if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { + if (sec % tsTelemInterval == (TMIN(86400, (tsTelemInterval - 1)))) { mndPullupTelem(pMnode); } diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index 810c71b7c5..bd613d7e69 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -132,6 +132,8 @@ _OVER: } static int32_t mndProcessTelemTimer(SRpcMsg* pReq) { + int32_t code = 0; + int32_t line = 0; SMnode* pMnode = pReq->info.node; STelemMgmt* pMgmt = &pMnode->telemMgmt; if (!tsEnableTelem) return 0; @@ -140,15 +142,18 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) { char* pCont = mndBuildTelemetryReport(pMnode); (void)taosThreadMutexUnlock(&pMgmt->lock); - if (pCont != NULL) { - if (taosSendHttpReport(tsTelemServer, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) { - mError("failed to send telemetry report"); - } else { - mInfo("succeed to send telemetry report"); - } - taosMemoryFree(pCont); + if (pCont == NULL) { + return 0; } - return 0; + code = taosSendTelemReport(&pMgmt->addrMgt, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT); + taosMemoryFree(pCont); + return code; +_end: + if (code != 0) { + mError("%s failed to send at line %d since %s", __func__, line, tstrerror(code)); + } + taosMemoryFree(pCont); + return code; } int32_t mndInitTelem(SMnode* pMnode) { @@ -158,6 +163,11 @@ int32_t mndInitTelem(SMnode* pMnode) { (void)taosThreadMutexInit(&pMgmt->lock, NULL); if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0) mWarn("failed to get email since %s", tstrerror(code)); + code = taosTelemetryMgtInit(&pMgmt->addrMgt, tsTelemServer); + if (code != 0) { + mError("failed to init telemetry management since %s", tstrerror(code)); + return code; + } mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer); return 0; @@ -165,5 +175,6 @@ int32_t mndInitTelem(SMnode* pMnode) { void mndCleanupTelem(SMnode* pMnode) { STelemMgmt* pMgmt = &pMnode->telemMgmt; + taosTelemetryDestroy(&pMgmt->addrMgt); (void)taosThreadMutexDestroy(&pMgmt->lock); } diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 0c0f723b8b..deb5f07436 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -31,6 +31,7 @@ static int32_t FAST_FAILURE_LIMIT = 1; static int64_t httpDefaultChanId = -1; static int64_t httpSeqNum = 0; +static int32_t httpRecvRefMgt = 0; typedef struct SHttpModule { uv_loop_t* loop; @@ -54,6 +55,8 @@ typedef struct SHttpMsg { int64_t chanId; int64_t seq; char* qid; + + int64_t recvBufRid; } SHttpMsg; typedef struct SHttpClient { @@ -67,6 +70,8 @@ typedef struct SHttpClient { struct sockaddr_in dest; int64_t chanId; int64_t seq; + + int64_t recvBufRid; } SHttpClient; typedef struct SHttpConnList { @@ -74,6 +79,13 @@ typedef struct SHttpConnList { } SHttpConnList; +typedef struct { + char* pBuf; + int32_t nBuf; + SRWLatch latch; + int8_t inited; +} SHttpRecvBuf; + static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; static void transHttpEnvInit(); @@ -94,6 +106,9 @@ static void httpModuleDestroy(SHttpModule* http); static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid); +static int32_t taosSendHttpReportImplByChan2(const char* server, const char* uri, uint16_t port, char* pCont, + int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid, + int64_t rid); static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, const char* qid, char* pHead, int32_t headLen, @@ -394,6 +409,8 @@ static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) { taosMemoryFree(cli->wbuf); taosMemoryFree(cli->rbuf); taosMemoryFree(cli->addr); + // taosFreeHttpRecvHandle(cli->recvBufRid); + taosMemoryFree(cli); } @@ -423,6 +440,28 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const tError("http-report recv error:%s, seq:%" PRId64 "", uv_strerror(nread), cli->seq); } else { tTrace("http-report succ to recv %d bytes, seq:%" PRId64 "", (int32_t)nread, cli->seq); + if (cli->recvBufRid > 0) { + SHttpRecvBuf* p = taosAcquireRef(httpRecvRefMgt, cli->recvBufRid); + if (p != NULL) { + taosWLockLatch(&p->latch); + if (p->inited != 0) { + tDebug("http-report already recv valid data"); + } else { + p->pBuf = taosMemoryCalloc(1, nread + 1); + if (p->pBuf != NULL) { + memcpy(p->pBuf, buf->base, nread); + p->nBuf = nread; + p->inited = 1; + } else { + tError("http-report failed to dump recv since %s", tstrerror(terrno)); + } + } + taosWUnLockLatch(&p->latch); + TAOS_UNUSED(taosReleaseRef(httpRecvRefMgt, cli->recvBufRid)); + } else { + tWarn("http-report failed to acquire recv buf since %s", tstrerror(terrno)); + } + } } if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); @@ -629,6 +668,8 @@ static void httpHandleReq(SHttpMsg* msg) { cli->chanId = chanId; cli->addr = msg->server; cli->port = msg->port; + cli->recvBufRid = msg->recvBufRid; + if (msg->qid != NULL) taosMemoryFree(msg->qid); taosMemoryFree(msg->uri); taosMemoryFree(msg); @@ -714,6 +755,47 @@ void httpModuleDestroy2(SHttpModule* http) { taosMemoryFree(http); } +static int32_t taosSendHttpReportImplByChan2(const char* server, const char* uri, uint16_t port, char* pCont, + int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid, + int64_t recvBufRid) { + SHttpModule* load = NULL; + SHttpMsg* msg = NULL; + int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, qid, &msg); + if (code != 0) { + goto _ERROR; + } + + msg->recvBufRid = recvBufRid; + + load = taosAcquireRef(httpRefMgt, chanId); + if (load == NULL) { + code = terrno; + goto _ERROR; + } + + if (atomic_load_8(&load->quit)) { + code = TSDB_CODE_HTTP_MODULE_QUIT; + goto _ERROR; + } + tDebug("http-report start to report, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, msg->seq); + + code = transAsyncSend(load->asyncPool, &(msg->q)); + if (code != 0) { + code = TSDB_CODE_HTTP_MODULE_QUIT; + goto _ERROR; + } + msg = NULL; + +_ERROR: + + if (code != 0) { + tError("http-report failed to report reason:%s, chanId:%" PRId64 ", seq:%" PRId64 "", tstrerror(code), chanId, + msg->seq); + } + httpDestroyMsg(msg); + if (load != NULL) taosReleaseRef(httpRefMgt, chanId); + return code; +} static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid) { SHttpModule* load = NULL; @@ -769,20 +851,34 @@ int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid); } +int32_t taosSendRecvHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, const char* qid, int64_t recvBufId) { + TAOS_UNUSED(taosThreadOnce(&transHttpInit, transHttpEnvInit)); + return taosSendHttpReportImplByChan2(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid, recvBufId); +} + static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); } +static void transHttpDestroyRecvHandle(void* handle) { + SHttpRecvBuf* p = handle; + taosMemoryFree(p->pBuf); + taosMemoryFree(p); +} + int64_t transInitHttpChanImpl(); static void transHttpEnvInit() { httpRefMgt = taosOpenRef(64, transHttpDestroyHandle); httpDefaultChanId = transInitHttpChanImpl(); httpSeqNum = 0; + httpRecvRefMgt = taosOpenRef(8, transHttpDestroyRecvHandle); } void transHttpEnvDestroy() { // remove default chanId taosDestroyHttpChan(httpDefaultChanId); httpDefaultChanId = -1; + taosCloseRef(httpRecvRefMgt); } int64_t transInitHttpChanImpl() { @@ -868,3 +964,158 @@ void taosDestroyHttpChan(int64_t chanId) { TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId)); TAOS_UNUSED(taosRemoveRef(httpRefMgt, chanId)); } +static int32_t taosAllocHttpRecvHandle(int64_t* rid) { + TAOS_UNUSED(taosThreadOnce(&transHttpInit, transHttpEnvInit)); + SHttpRecvBuf* p = taosMemoryCalloc(1, sizeof(SHttpRecvBuf)); + if (p == NULL) { + return terrno; + } + taosInitRWLatch(&p->latch); + + int64_t id = taosAddRef(httpRecvRefMgt, p); + if (taosAcquireRef(httpRecvRefMgt, id) == NULL) { + taosMemoryFree(p); + return terrno; + } + *rid = id; + return 0; +} +static void taosFreeHttpRecvHandle(int64_t rid) { + if (rid <= 0) { + return; + } + TAOS_UNUSED(taosReleaseRef(httpRecvRefMgt, rid)); + TAOS_UNUSED(taosRemoveRef(httpRecvRefMgt, rid)); +} +static int32_t taosGetHttpRecvById(int64_t rid, char** pRecv, int32_t* len) { + int32_t code = 0; + SHttpRecvBuf* p = taosAcquireRef(httpRecvRefMgt, rid); + if (p == NULL) { + return TSDB_CODE_INVALID_PARA; + } + taosWLockLatch(&p->latch); + if (p->inited) { + *pRecv = p->pBuf; + *len = p->nBuf; + p->pBuf = 0; + p->nBuf = 0; + } else { + code = TSDB_CODE_INVALID_PARA; + } + taosWUnLockLatch(&p->latch); + TAOS_UNUSED(taosReleaseRef(httpRecvRefMgt, rid)); + + return code; +} + +int32_t taosTelemetryMgtInit(STelemAddrMgmt* mgt, char* defaultAddr) { + tstrncpy(mgt->defaultAddr, defaultAddr, sizeof(mgt->defaultAddr)); + mgt->cachedAddr[0] = 0; + mgt->recvBufRid = 0; + return 0; +} +void taosTelemetryDestroy(STelemAddrMgmt* mgt) { + if (mgt == NULL || mgt->recvBufRid <= 0) { + return; + } + taosFreeHttpRecvHandle(mgt->recvBufRid); + mgt->recvBufRid = 0; +} +// TODO: parse http response head By LIB +static int32_t taosTelemetryGetValueFromHttpResp(const char* response, const char* key, char* val) { + if (key == NULL || val == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + int32_t code = 0, line = 0; + int32_t len = strlen(key); + int32_t cap = len + 16; + // const char* reportUrlKey = "\"report_url\":\""; + + char* buf = taosMemoryCalloc(1, cap); + if (buf == NULL) { + return terrno; + } + int32_t nBytes = snprintf(buf, cap, "\"%s\":\"", key); + if ((uint32_t)nBytes >= cap) { + TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end); + } + + char* start = strstr(response, buf); + if (start) { + start += strlen(buf); + char* end = strchr(start, '"'); + if (end && end - start > 0) { + tstrncpy(val, start, end - start + 1); + } else { + TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end); + } + } else { + TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end); + } +_end: + if (code != 0) { + tError("failed to get value from http response since %s", tstrerror(code)); + } + taosMemoryFree(buf); + return code; +} +static int32_t taosGetAddrFromHttpResp(int64_t recvBufRid, char* telemAddr) { + int32_t code = 0; + char* pRecv = NULL; + int32_t nRecv = 0; + + code = taosGetHttpRecvById(recvBufRid, &pRecv, &nRecv); + if (code != 0) { + tError("failed to get http recv buf since %s", tstrerror(code)); + return code; + } + + code = taosTelemetryGetValueFromHttpResp(pRecv, "report_url", telemAddr); + taosMemoryFree(pRecv); + return code; +} +int32_t taosSendTelemReport(STelemAddrMgmt* mgt, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag) { + int32_t code = 0; + int32_t line = 0; + int32_t sendMsg = 0; + char* addr = mgt->defaultAddr; + if (mgt->cachedAddr[0] == 0) { + if (mgt->recvBufRid == 0) { + code = taosAllocHttpRecvHandle(&mgt->recvBufRid); + TAOS_CHECK_GOTO(code, &line, _end); + code = taosSendRecvHttpReportWithQID(mgt->defaultAddr, uri, port, pCont, contLen, flag, NULL, mgt->recvBufRid); + TAOS_CHECK_GOTO(code, &line, _end); + } else { + code = taosGetAddrFromHttpResp(mgt->recvBufRid, mgt->cachedAddr); + if (code != 0) { + tError("failed to get cache addr from http response since %s", tstrerror(code)); + addr = mgt->defaultAddr; + } else { + addr = mgt->cachedAddr; + } + + if (mgt->recvBufRid > 0) { + taosFreeHttpRecvHandle(mgt->recvBufRid); + mgt->recvBufRid = 0; + } + sendMsg = 1; + } + } else { + addr = mgt->cachedAddr; + sendMsg = 1; + } + if (sendMsg == 1) { + code = taosSendHttpReport(addr, uri, port, pCont, contLen, flag); + } + tDebug("send telemetry port oldAddr:[%s], newAddr:[%s]", mgt->defaultAddr, mgt->cachedAddr); + return code; + +_end: + if (code != 0) { + tError("failed to send telemetry since %s, default addr:%s, cachedAddr:%s", tstrerror(code), mgt->defaultAddr, + mgt->cachedAddr); + } + return code; +} diff --git a/source/libs/transport/test/transUT2.cpp b/source/libs/transport/test/transUT2.cpp index d6b336b014..6dfb5e503a 100644 --- a/source/libs/transport/test/transUT2.cpp +++ b/source/libs/transport/test/transUT2.cpp @@ -22,6 +22,8 @@ #include "transLog.h" #include "trpc.h" #include "tversion.h" +#include "thttp.h" +#include "tjson.h" using namespace std; const char *label = "APP"; @@ -515,7 +517,7 @@ TEST_F(TransEnv, idTest) { TEST_F(TransEnv, noResp) { SRpcMsg resp = {0}; SRpcMsg req = {0}; - for (int i = 0; i < 500000; i++) { + for (int i = 0; i < 50000; i++) { memset(&req, 0, sizeof(req)); req.info.noResp = 0; req.msgType = 3; @@ -527,3 +529,107 @@ TEST_F(TransEnv, noResp) { taosMsleep(10000); // no resp } + +TEST_F(TransEnv, http) { + int32_t code = 0; + char tmp[4096] = {0}; + int32_t lino = 0; + SJson* pJson = tjsonCreateObject(); + + char clusterName[64] = {0}; + tjsonAddStringToObject(pJson, "instanceId", clusterName); + tjsonAddDoubleToObject(pJson, "reportVersion", 1); + + if (taosGetOsReleaseName(tmp, NULL, NULL, sizeof(tmp)) == 0) { + tjsonAddStringToObject(pJson, "os", tmp); + } + char *pCont = NULL; + int32_t len = 0; + + float numOfCores = 0; + if (taosGetCpuInfo(tmp, sizeof(tmp), &numOfCores) == 0) { + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "cpuModel", tmp), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores), &lino, _OVER); + } else { + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores), &lino, _OVER); + } + + snprintf(tmp, sizeof(tmp), "%" PRId64 " kB", tsTotalMemoryKB); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "memory", tmp), &lino, _OVER); + + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "version", td_version), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "buildInfo", td_buildinfo), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "gitInfo", td_gitinfo), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "email", "test126.com"), &lino, _OVER); + + + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfDnode",1 ), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfMnode", 1), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfVgroup", 1), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfDatabase", 1), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfSuperTable", 1), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfChildTable", 1), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfColumn", 1), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfPoint", 1), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "totalStorage", 1), &lino, _OVER); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "compStorage", 1), &lino, _OVER); + + pCont = tjsonToString(pJson); + len = strlen(pCont); + tjsonDelete(pJson); + + { + #if 0 + STelemAddrMgmt mgt; + taosTelemetryMgtInit(&mgt, "telemetry.tdengine.com"); + int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT); + + taosMsleep(2000); + code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT); + printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr); + for (int32_t i = 0; i < 10; i++) { + code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT); + + printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr); + taosMsleep(2000); + } + taosTelemetryDestroy(&mgt); +#endif + } + + { +#if 0 + STelemAddrMgmt mgt; + taosTelemetryMgtInit(&mgt, "telemetry.taosdata.com"); + int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT); + printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr); + + taosMsleep(2000); + code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT); + for (int32_t i = 0; i < 10; i++) { + code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT); + printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr); + taosMsleep(2000); + } + taosTelemetryDestroy(&mgt); +#endif + + } + { + STelemAddrMgmt mgt; + taosTelemetryMgtInit(&mgt, "error"); + int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT); + + taosMsleep(2000); + code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT); + for (int32_t i = 0; i < 10; i++) { + code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT); + taosMsleep(2000); + + } + taosTelemetryDestroy(&mgt); + + } +_OVER: + return; +}