telemetry refactor

This commit is contained in:
yihaoDeng 2024-12-14 21:27:37 +08:00
parent a7062349d2
commit cd4f92ebf2
9 changed files with 338 additions and 29 deletions

View File

@ -17,6 +17,7 @@
#define _TD_UTIL_HTTP_H_ #define _TD_UTIL_HTTP_H_
#include "os.h" #include "os.h"
#include "tdef.h"
#include "tref.h" #include "tref.h"
#ifdef __cplusplus #ifdef __cplusplus
@ -35,6 +36,25 @@ int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t p
EHttpCompFlag flag, int64_t chanId, const char* qid); EHttpCompFlag flag, int64_t chanId, const char* qid);
void taosDestroyHttpChan(int64_t chanId); void taosDestroyHttpChan(int64_t chanId);
int32_t taosAllocHttpRecvHandle(int64_t* rid);
void taosFreeHttpRecvHandle(int64_t rid);
int32_t taosGetHttpRecvBufByHandle(int64_t rid, char** pRecv, int32_t* recvLen);
int32_t taosSendRecvHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, const char* qid, int64_t recvBufId);
typedef struct {
char defaultAddr[256 + 1];
char cachedAddr[256 + 1];
int64_t recvBufRid;
} STelemAddrMgmt;
int32_t taosTelemetryMgtInit(STelemAddrMgmt* mgt, char* defaultAddr);
void taosTelemetryDestroy(STelemAddrMgmt* mgt);
// not safe for multi-thread, should be called in the same thread
int32_t taosSendTelemReport(STelemAddrMgmt* mgt, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -166,7 +166,8 @@ static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType))); ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject( ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows))); json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
if (pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) { if (pRequest->sqlstr != NULL &&
strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen]; char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0'; pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr))); ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
@ -816,6 +817,8 @@ static void *tscCrashReportThreadFp(void *param) {
if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) { if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
return NULL; return NULL;
} }
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, tsTelemServer);
while (1) { while (1) {
if (clientStop > 0) break; if (clientStop > 0) break;
@ -826,7 +829,7 @@ static void *tscCrashReportThreadFp(void *param) {
taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile); taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
if (pMsg && msgLen > 0) { if (pMsg && msgLen > 0) {
if (taosSendHttpReport(tsTelemServer, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) { if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
tscError("failed to send crash report"); tscError("failed to send crash report");
if (pFile) { if (pFile) {
taosReleaseCrashLogFile(pFile, false); taosReleaseCrashLogFile(pFile, false);
@ -860,6 +863,7 @@ static void *tscCrashReportThreadFp(void *param) {
taosMsleep(sleepTime); taosMsleep(sleepTime);
loopTimes = 0; loopTimes = 0;
} }
taosTelemetryDestroy(&mgt);
clientStop = -2; clientStop = -2;
return NULL; return NULL;
@ -1099,7 +1103,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
*/ */
uint64_t generateRequestId() { uint64_t generateRequestId() {
static uint32_t hashId = 0; static uint32_t hashId = 0;
static int32_t requestSerialId = 0; static int32_t requestSerialId = 0;
if (hashId == 0) { if (hashId == 0) {
int32_t code = taosGetSystemUUIDU32(&hashId); int32_t code = taosGetSystemUUIDU32(&hashId);

View File

@ -127,7 +127,7 @@ bool tsEnableTelem = false;
#else #else
bool tsEnableTelem = true; bool tsEnableTelem = true;
#endif #endif
int32_t tsTelemInterval = 43200; int32_t tsTelemInterval = 60;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com"; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com";
uint16_t tsTelemPort = 80; uint16_t tsTelemPort = 80;
char *tsTelemUri = "/report"; char *tsTelemUri = "/report";
@ -537,7 +537,8 @@ int32_t taosAddClientLogCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "qDebugFlag", qDebugFlag, 0, 255, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "qDebugFlag", qDebugFlag, 0, 255, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "cDebugFlag", cDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "cDebugFlag", cDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebugFlag", tqClientDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "tqClientDebugFlag", tqClientDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_SERVER));
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
@ -590,8 +591,10 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN( TAOS_CHECK_RETURN(
cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, TAOS_CHECK_RETURN(cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter,
CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
@ -781,7 +784,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "telemetryInterval", tsTelemInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "telemetryInterval", tsTelemInterval, 59, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_SERVER, CFG_DYN_BOTH)); TAOS_CHECK_RETURN(cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_SERVER, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_SERVER, CFG_DYN_NONE));
@ -1999,14 +2002,17 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{ // 'bool/int32_t/int64_t/float/double' variables with general modification function { // 'bool/int32_t/int64_t/float/double' variables with general modification function
static OptionNameAndVar debugOptions[] = { static OptionNameAndVar debugOptions[] = {
{"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag}, {"mDebugFlag", &mDebugFlag}, {"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag},
{"wDebugFlag", &wDebugFlag}, {"azDebugFlag", &azDebugFlag}, {"sDebugFlag", &sDebugFlag}, {"mDebugFlag", &mDebugFlag}, {"wDebugFlag", &wDebugFlag},
{"tsdbDebugFlag", &tsdbDebugFlag}, {"tqDebugFlag", &tqDebugFlag}, {"fsDebugFlag", &fsDebugFlag}, {"azDebugFlag", &azDebugFlag}, {"sDebugFlag", &sDebugFlag},
{"udfDebugFlag", &udfDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"tsdbDebugFlag", &tsdbDebugFlag}, {"tqDebugFlag", &tqDebugFlag},
{"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"fsDebugFlag", &fsDebugFlag}, {"udfDebugFlag", &udfDebugFlag},
{"smaDebugFlag", &smaDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag},
{"metaDebugFlag", &metaDebugFlag}, {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag},
{"tqClientDebugFlag", &tqClientDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag},
{"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag},
{"metaDebugFlag", &metaDebugFlag}, {"stDebugFlag", &stDebugFlag},
{"sndDebugFlag", &sndDebugFlag}, {"tqClientDebugFlag", &tqClientDebugFlag},
}; };
static OptionNameAndVar options[] = {{"audit", &tsEnableAudit}, static OptionNameAndVar options[] = {{"audit", &tsEnableAudit},

View File

@ -243,8 +243,10 @@ static void *dmCrashReportThreadFp(void *param) {
bool truncateFile = false; bool truncateFile = false;
int32_t sleepTime = 200; int32_t sleepTime = 200;
int32_t reportPeriodNum = 3600 * 1000 / sleepTime; int32_t reportPeriodNum = 3600 * 1000 / sleepTime;
; int32_t loopTimes = reportPeriodNum;
int32_t loopTimes = reportPeriodNum;
STelemAddrMgmt mgt = {0};
taosTelemetryMgtInit(&mgt, tsTelemServer);
while (1) { while (1) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
@ -255,7 +257,7 @@ static void *dmCrashReportThreadFp(void *param) {
taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile); taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
if (pMsg && msgLen > 0) { if (pMsg && msgLen > 0) {
if (taosSendHttpReport(tsTelemServer, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) { if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
dError("failed to send crash report"); dError("failed to send crash report");
if (pFile) { if (pFile) {
taosReleaseCrashLogFile(pFile, false); taosReleaseCrashLogFile(pFile, false);
@ -289,6 +291,7 @@ static void *dmCrashReportThreadFp(void *param) {
taosMsleep(sleepTime); taosMsleep(sleepTime);
loopTimes = 0; loopTimes = 0;
} }
taosTelemetryDestroy(&mgt);
return NULL; return NULL;
} }

View File

@ -24,6 +24,7 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
#include "tgrant.h" #include "tgrant.h"
#include "thttp.h"
#include "tqueue.h" #include "tqueue.h"
#include "ttime.h" #include "ttime.h"
#include "version.h" #include "version.h"
@ -80,8 +81,9 @@ typedef struct {
} SProfileMgmt; } SProfileMgmt;
typedef struct { typedef struct {
TdThreadMutex lock; TdThreadMutex lock;
char email[TSDB_FQDN_LEN]; char email[TSDB_FQDN_LEN];
STelemAddrMgmt addrMgt;
} STelemMgmt; } STelemMgmt;
typedef struct { typedef struct {

View File

@ -410,9 +410,11 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
mndStreamConsensusChkpt(pMnode); mndStreamConsensusChkpt(pMnode);
} }
#ifndef TD_ENTERPRISE
if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
mndPullupTelem(pMnode); mndPullupTelem(pMnode);
} }
#endif
if (sec % tsGrantHBInterval == 0) { if (sec % tsGrantHBInterval == 0) {
mndPullupGrant(pMnode); mndPullupGrant(pMnode);

View File

@ -132,6 +132,8 @@ _OVER:
} }
static int32_t mndProcessTelemTimer(SRpcMsg* pReq) { static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
int32_t code = 0;
int32_t line = 0;
SMnode* pMnode = pReq->info.node; SMnode* pMnode = pReq->info.node;
STelemMgmt* pMgmt = &pMnode->telemMgmt; STelemMgmt* pMgmt = &pMnode->telemMgmt;
if (!tsEnableTelem) return 0; if (!tsEnableTelem) return 0;
@ -140,15 +142,18 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
char* pCont = mndBuildTelemetryReport(pMnode); char* pCont = mndBuildTelemetryReport(pMnode);
(void)taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
if (pCont != NULL) { if (pCont == NULL) {
if (taosSendHttpReport(tsTelemServer, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) { return 0;
mError("failed to send telemetry report");
} else {
mInfo("succeed to send telemetry report");
}
taosMemoryFree(pCont);
} }
return 0; code = taosSendTelemReport(&pMgmt->addrMgt, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT);
taosMemoryFree(pCont);
return code;
_end:
if (code != 0) {
mError("%s failed to send at line %d since %s", __func__, line, tstrerror(code));
}
taosMemoryFree(pCont);
return code;
} }
int32_t mndInitTelem(SMnode* pMnode) { int32_t mndInitTelem(SMnode* pMnode) {
@ -158,6 +163,7 @@ int32_t mndInitTelem(SMnode* pMnode) {
(void)taosThreadMutexInit(&pMgmt->lock, NULL); (void)taosThreadMutexInit(&pMgmt->lock, NULL);
if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0) if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0)
mWarn("failed to get email since %s", tstrerror(code)); mWarn("failed to get email since %s", tstrerror(code));
taosTelemetryMgtInit(&pMgmt->addrMgt, tsTelemServer);
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer); mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
return 0; return 0;
@ -165,5 +171,6 @@ int32_t mndInitTelem(SMnode* pMnode) {
void mndCleanupTelem(SMnode* pMnode) { void mndCleanupTelem(SMnode* pMnode) {
STelemMgmt* pMgmt = &pMnode->telemMgmt; STelemMgmt* pMgmt = &pMnode->telemMgmt;
taosTelemetryDestroy(&pMgmt->addrMgt);
(void)taosThreadMutexDestroy(&pMgmt->lock); (void)taosThreadMutexDestroy(&pMgmt->lock);
} }

View File

@ -31,6 +31,7 @@ static int32_t FAST_FAILURE_LIMIT = 1;
static int64_t httpDefaultChanId = -1; static int64_t httpDefaultChanId = -1;
static int64_t httpSeqNum = 0; static int64_t httpSeqNum = 0;
static int32_t httpRecvRefMgt = 0;
typedef struct SHttpModule { typedef struct SHttpModule {
uv_loop_t* loop; uv_loop_t* loop;
@ -54,6 +55,8 @@ typedef struct SHttpMsg {
int64_t chanId; int64_t chanId;
int64_t seq; int64_t seq;
char* qid; char* qid;
int64_t recvBufRid;
} SHttpMsg; } SHttpMsg;
typedef struct SHttpClient { typedef struct SHttpClient {
@ -67,6 +70,8 @@ typedef struct SHttpClient {
struct sockaddr_in dest; struct sockaddr_in dest;
int64_t chanId; int64_t chanId;
int64_t seq; int64_t seq;
int64_t recvBufRid;
} SHttpClient; } SHttpClient;
typedef struct SHttpConnList { typedef struct SHttpConnList {
@ -74,6 +79,13 @@ typedef struct SHttpConnList {
} SHttpConnList; } SHttpConnList;
typedef struct {
char* pBuf;
int32_t nBuf;
SRWLatch latch;
int8_t inited;
} SHttpRecvBuf;
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
static void transHttpEnvInit(); static void transHttpEnvInit();
@ -94,6 +106,9 @@ static void httpModuleDestroy(SHttpModule* http);
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid); int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid);
static int32_t taosSendHttpReportImplByChan2(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid,
int64_t rid);
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, const char* qid, char* pHead, static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, const char* qid, char* pHead,
int32_t headLen, int32_t headLen,
@ -394,6 +409,8 @@ static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
taosMemoryFree(cli->wbuf); taosMemoryFree(cli->wbuf);
taosMemoryFree(cli->rbuf); taosMemoryFree(cli->rbuf);
taosMemoryFree(cli->addr); taosMemoryFree(cli->addr);
// taosFreeHttpRecvHandle(cli->recvBufRid);
taosMemoryFree(cli); taosMemoryFree(cli);
} }
@ -423,6 +440,28 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const
tError("http-report recv error:%s, seq:%" PRId64 "", uv_strerror(nread), cli->seq); tError("http-report recv error:%s, seq:%" PRId64 "", uv_strerror(nread), cli->seq);
} else { } else {
tTrace("http-report succ to recv %d bytes, seq:%" PRId64 "", (int32_t)nread, cli->seq); tTrace("http-report succ to recv %d bytes, seq:%" PRId64 "", (int32_t)nread, cli->seq);
if (cli->recvBufRid > 0) {
SHttpRecvBuf* p = taosAcquireRef(httpRecvRefMgt, cli->recvBufRid);
if (p != NULL) {
taosWLockLatch(&p->latch);
if (p->inited != 0) {
tDebug("http-report already recv valid data");
} else {
p->pBuf = taosMemoryCalloc(1, nread + 1);
if (p->pBuf != NULL) {
memcpy(p->pBuf, buf->base, nread);
p->nBuf = nread;
p->inited = 1;
} else {
tError("http-report failed to dump recv since %s", tstrerror(terrno));
}
}
taosWUnLockLatch(&p->latch);
taosReleaseRef(httpRecvRefMgt, cli->recvBufRid);
} else {
tWarn("http-report failed to acquire recv buf since %s", tstrerror(terrno));
}
}
} }
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
@ -629,6 +668,8 @@ static void httpHandleReq(SHttpMsg* msg) {
cli->chanId = chanId; cli->chanId = chanId;
cli->addr = msg->server; cli->addr = msg->server;
cli->port = msg->port; cli->port = msg->port;
cli->recvBufRid = msg->recvBufRid;
if (msg->qid != NULL) taosMemoryFree(msg->qid); if (msg->qid != NULL) taosMemoryFree(msg->qid);
taosMemoryFree(msg->uri); taosMemoryFree(msg->uri);
taosMemoryFree(msg); taosMemoryFree(msg);
@ -714,6 +755,47 @@ void httpModuleDestroy2(SHttpModule* http) {
taosMemoryFree(http); taosMemoryFree(http);
} }
static int32_t taosSendHttpReportImplByChan2(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid,
int64_t recvBufRid) {
SHttpModule* load = NULL;
SHttpMsg* msg = NULL;
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, qid, &msg);
if (code != 0) {
goto _ERROR;
}
msg->recvBufRid = recvBufRid;
load = taosAcquireRef(httpRefMgt, chanId);
if (load == NULL) {
code = terrno;
goto _ERROR;
}
if (atomic_load_8(&load->quit)) {
code = TSDB_CODE_HTTP_MODULE_QUIT;
goto _ERROR;
}
tDebug("http-report start to report, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, msg->seq);
code = transAsyncSend(load->asyncPool, &(msg->q));
if (code != 0) {
code = TSDB_CODE_HTTP_MODULE_QUIT;
goto _ERROR;
}
msg = NULL;
_ERROR:
if (code != 0) {
tError("http-report failed to report reason:%s, chanId:%" PRId64 ", seq:%" PRId64 "", tstrerror(code), chanId,
msg->seq);
}
httpDestroyMsg(msg);
if (load != NULL) taosReleaseRef(httpRefMgt, chanId);
return code;
}
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid) { int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid) {
SHttpModule* load = NULL; SHttpModule* load = NULL;
@ -769,20 +851,30 @@ int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid); return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid);
} }
int32_t taosSendRecvHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, const char* qid, int64_t recvBufId) {
TAOS_UNUSED(taosThreadOnce(&transHttpInit, transHttpEnvInit));
return taosSendHttpReportImplByChan2(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid, recvBufId);
}
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); } static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
static void transHttpDestroyRecvHandle(void* handle) { taosMemoryFree(handle); }
int64_t transInitHttpChanImpl(); int64_t transInitHttpChanImpl();
static void transHttpEnvInit() { static void transHttpEnvInit() {
httpRefMgt = taosOpenRef(64, transHttpDestroyHandle); httpRefMgt = taosOpenRef(64, transHttpDestroyHandle);
httpDefaultChanId = transInitHttpChanImpl(); httpDefaultChanId = transInitHttpChanImpl();
httpSeqNum = 0; httpSeqNum = 0;
httpRecvRefMgt = taosOpenRef(8, transHttpDestroyRecvHandle);
} }
void transHttpEnvDestroy() { void transHttpEnvDestroy() {
// remove default chanId // remove default chanId
taosDestroyHttpChan(httpDefaultChanId); taosDestroyHttpChan(httpDefaultChanId);
httpDefaultChanId = -1; httpDefaultChanId = -1;
taosCloseRef(httpRecvRefMgt);
} }
int64_t transInitHttpChanImpl() { int64_t transInitHttpChanImpl() {
@ -868,3 +960,166 @@ void taosDestroyHttpChan(int64_t chanId) {
TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId)); TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
TAOS_UNUSED(taosRemoveRef(httpRefMgt, chanId)); TAOS_UNUSED(taosRemoveRef(httpRefMgt, chanId));
} }
int32_t taosAllocHttpRecvHandle(int64_t* rid) {
TAOS_UNUSED(taosThreadOnce(&transHttpInit, transHttpEnvInit));
SHttpRecvBuf* p = taosMemoryCalloc(1, sizeof(SHttpRecvBuf));
if (p == NULL) {
return terrno;
}
taosInitRWLatch(&p->latch);
int64_t id = taosAddRef(httpRecvRefMgt, p);
if (taosAcquireRef(httpRecvRefMgt, id) == NULL) {
taosMemoryFree(p);
return terrno;
}
*rid = id;
return 0;
}
void taosFreeHttpRecvHandle(int64_t rid) {
if (rid <= 0) {
return;
}
taosReleaseRef(httpRecvRefMgt, rid);
taosRemoveRef(httpRecvRefMgt, rid);
}
int32_t taosGetHttpRecvById(int64_t rid, char** pRecv, int32_t* len) {
int32_t code = 0;
SHttpRecvBuf* p = taosAcquireRef(httpRecvRefMgt, rid);
if (p == NULL) {
return TSDB_CODE_INVALID_PARA;
}
taosWLockLatch(&p->latch);
if (p->inited) {
*pRecv = p->pBuf;
*len = p->nBuf;
p->pBuf = 0;
p->nBuf = 0;
} else {
code = TSDB_CODE_INVALID_PARA;
}
taosWUnLockLatch(&p->latch);
taosReleaseRef(httpRecvRefMgt, rid);
return code;
}
int32_t taosTelemetryMgtInit(STelemAddrMgmt* mgt, char* defaultAddr) {
tstrncpy(mgt->defaultAddr, defaultAddr, sizeof(mgt->defaultAddr));
mgt->cachedAddr[0] = 0;
mgt->recvBufRid = 0;
return 0;
}
int32_t taosTelemetryMgtGetAddr(STelemAddrMgmt* mgt, char* addr) {
if (mgt->cachedAddr[0] == 0) {
tstrncpy(addr, mgt->defaultAddr, TD_FQDN_LEN);
} else {
tstrncpy(addr, mgt->cachedAddr, TD_FQDN_LEN);
}
return 0;
}
void taosTelemetryDestroy(STelemAddrMgmt* mgt) {
if (mgt == NULL || mgt->recvBufRid <= 0) {
return;
}
taosFreeHttpRecvHandle(mgt->recvBufRid);
mgt->recvBufRid = 0;
}
// TODO: parse http response head By LIB
int32_t taosGetValueFromHttpResp(const char* response, const char* key, char* val) {
if (key == NULL || val == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0, line = 0;
int32_t len = strlen(key);
int32_t cap = len + 16;
// const char* reportUrlKey = "\"report_url\":\"";
char* buf = taosMemoryCalloc(1, cap);
if (buf == NULL) {
return terrno;
}
int32_t nBytes = snprintf(buf, cap, "\"%s\":\"", key);
if ((uint32_t)nBytes >= cap) {
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end);
}
char* start = strstr(response, buf);
if (start) {
start += strlen(buf);
char* end = strchr(start, '"');
if (end && end - start > 0) {
tstrncpy(val, start, end - start + 1);
} else {
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end);
}
} else {
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end);
}
_end:
if (code != 0) {
tError("failed to get value from http response since %s", tstrerror(code));
}
taosMemoryFree(buf);
return code;
}
static int32_t taosGetAddrFromHttpResp(int64_t recvBufRid, char* telemAddr) {
int32_t code = 0;
char* pRecv = NULL;
int32_t nRecv = 0;
code = taosGetHttpRecvById(recvBufRid, &pRecv, &nRecv);
if (code != 0) {
tError("failed to get http recv buf since %s", tstrerror(code));
return code;
}
code = taosGetValueFromHttpResp(pRecv, "report_url", telemAddr);
taosMemoryFree(pRecv);
return code;
}
int32_t taosSendTelemReport(STelemAddrMgmt* mgt, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag) {
int32_t code = 0;
int32_t line = 0;
int32_t sendMsg = 0;
char* addr = mgt->defaultAddr;
if (mgt->cachedAddr[0] == 0) {
if (mgt->recvBufRid == 0) {
code = taosAllocHttpRecvHandle(&mgt->recvBufRid);
TAOS_CHECK_GOTO(code, &line, _end);
code = taosSendRecvHttpReportWithQID(mgt->defaultAddr, uri, port, pCont, contLen, flag, NULL, mgt->recvBufRid);
TAOS_CHECK_GOTO(code, &line, _end);
} else {
code = taosGetAddrFromHttpResp(mgt->recvBufRid, mgt->cachedAddr);
if (code != 0) {
tError("failed to get cache addr from http response since %s", tstrerror(code));
addr = mgt->defaultAddr;
} else {
addr = mgt->cachedAddr;
}
if (mgt->recvBufRid > 0) {
taosFreeHttpRecvHandle(mgt->recvBufRid);
mgt->recvBufRid = 0;
}
sendMsg = 1;
}
} else {
addr = mgt->cachedAddr;
sendMsg = 1;
}
if (sendMsg == 1) {
code = taosSendHttpReport(addr, uri, port, pCont, contLen, flag);
}
tDebug("send telemetry port oldAddr:[%s], newAddr:[%s]", mgt->defaultAddr, mgt->cachedAddr);
return code;
_end:
if (code != 0) {
tError("failed to send telemetry since %s, default addr:%s, cachedAddr:%s", tstrerror(code), mgt->defaultAddr,
mgt->cachedAddr);
}
return code;
}

View File

@ -22,6 +22,7 @@
#include "transLog.h" #include "transLog.h"
#include "trpc.h" #include "trpc.h"
#include "tversion.h" #include "tversion.h"
#include "thttp.h"
using namespace std; using namespace std;
const char *label = "APP"; const char *label = "APP";
@ -527,3 +528,12 @@ TEST_F(TransEnv, noResp) {
taosMsleep(10000); taosMsleep(10000);
// no resp // no resp
} }
TEST_F(TransEnv, http) {
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, "telemetry.tdengine.com");
int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT);
taosMsleep(10000);
taosTelemetryDestroy(&mgt);
}