enh(analysis): add the algorithm in telemetry report.

This commit is contained in:
Haojun Liao 2025-01-23 00:29:18 +08:00
parent 30ae443ce0
commit 883b1b79a2
8 changed files with 108 additions and 27 deletions

View File

@ -86,7 +86,7 @@ int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf);
int32_t taosAnalBufClose(SAnalyticBuf *pBuf);
void taosAnalBufDestroy(SAnalyticBuf *pBuf);
const char *taosAnalAlgoStr(EAnalAlgoType algoType);
const char *taosAnalysisAlgoType(EAnalAlgoType algoType);
EAnalAlgoType taosAnalAlgoInt(const char *algoName);
const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType);

View File

@ -15,6 +15,10 @@ if(DEFINED GRANT_CFG_INCLUDE_DIR)
add_definitions(-DGRANTS_CFG)
endif()
if(${BUILD_WITH_ANALYSIS})
add_definitions(-DUSE_ANALYTICS)
endif()
if(TD_GRANT)
ADD_DEFINITIONS(-D_GRANT)
endif()

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "tanalytics.h"
#include "ttypes.h"
#include "tutil.h"
#ifdef USE_ANALYTICS
#include <curl/curl.h>
@ -35,7 +36,7 @@ typedef struct {
static SAlgoMgmt tsAlgos = {0};
static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen);
const char *taosAnalAlgoStr(EAnalAlgoType type) {
const char *taosAnalysisAlgoType(EAnalAlgoType type) {
switch (type) {
case ANAL_ALGO_TYPE_ANOMALY_DETECT:
return "anomaly-detection";
@ -59,7 +60,7 @@ const char *taosAnalAlgoUrlStr(EAnalAlgoType type) {
EAnalAlgoType taosAnalAlgoInt(const char *name) {
for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) {
if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) {
if (strcasecmp(name, taosAnalysisAlgoType(i)) == 0) {
return i;
}
}
@ -187,12 +188,12 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url,
SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
if (pUrl != NULL) {
tstrncpy(url, pUrl->url, urlLen);
uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url);
uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalysisAlgoType(type), url);
} else {
url[0] = 0;
terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND;
code = terrno;
uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type));
uError("algo:%s, type:%s, url not found", algoName, taosAnalysisAlgoType(type));
}
if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
@ -789,7 +790,7 @@ int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; }
int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; }
void taosAnalBufDestroy(SAnalyticBuf *pBuf) {}
const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; }
const char *taosAnalysisAlgoType(EAnalAlgoType algoType) { return 0; }
EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }
const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; }

View File

@ -22,8 +22,9 @@
extern "C" {
#endif
int32_t mndInitAnode(SMnode *pMnode);
void mndCleanupAnode(SMnode *pMnode);
int32_t mndInitAnode(SMnode* pMnode);
void mndCleanupAnode(SMnode* pMnode);
void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd);
#ifdef __cplusplus
}

View File

@ -637,6 +637,32 @@ static void mndCancelGetNextAnode(SMnode *pMnode, void *pIter) {
sdbCancelFetchByType(pSdb, pIter, SDB_ANODE);
}
// todo handle multiple anode case, remove the duplicate algos
void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SAnodeObj *pObj = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_ANODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
if (pObj->numOfAlgos >= ANAL_ALGO_TYPE_END) {
if (pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT] != NULL) {
taosArrayAddAll(pAd, pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT]);
}
if (pObj->algos[ANAL_ALGO_TYPE_FORECAST] != NULL) {
taosArrayAddAll(pFc, pObj->algos[ANAL_ALGO_TYPE_FORECAST]);
}
}
sdbRelease(pSdb, pObj);
}
}
static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -661,7 +687,7 @@ static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
if (code != 0) goto _end;
STR_TO_VARSTR(buf, taosAnalAlgoStr(t));
STR_TO_VARSTR(buf, taosAnalysisAlgoType(t));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, buf, false);
if (code != 0) goto _end;

View File

@ -411,7 +411,7 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
mndStreamConsensusChkpt(pMnode);
}
if (sec % tsTelemInterval == (TMIN(86400, (tsTelemInterval - 1)))) {
if (sec % 20 == (TMIN(86400, (20 - 1)))) {
mndPullupTelem(pMnode);
}

View File

@ -19,6 +19,7 @@
#include "mndSync.h"
#include "thttp.h"
#include "tjson.h"
#include "mndAnode.h"
typedef struct {
int64_t numOfDnode;
@ -32,6 +33,7 @@ typedef struct {
int64_t totalPoints;
int64_t totalStorage;
int64_t compStorage;
int32_t numOfAnalysisAlgos;
} SMnodeStat;
static void mndGetStat(SMnode* pMnode, SMnodeStat* pStat) {
@ -58,18 +60,21 @@ static void mndGetStat(SMnode* pMnode, SMnodeStat* pStat) {
sdbRelease(pSdb, pVgroup);
}
}
pStat->numOfChildTable = 100;
pStat->numOfColumn = 200;
pStat->totalPoints = 300;
pStat->totalStorage = 400;
pStat->compStorage = 500;
static int32_t algoToJson(const void* pObj, SJson* pJson) {
const SAnodeAlgo* pNode = (const SAnodeAlgo*)pObj;
int32_t code = tjsonAddStringToObject(pJson, "name", pNode->name);
return code;
}
static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) {
SMnodeStat mstat = {0};
int32_t code = 0;
int32_t lino = 0;
SArray* pFcList = NULL;
SArray* pAdList = NULL;
mndGetStat(pMnode, &mstat);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfDnode", mstat.numOfDnode), &lino, _OVER);
@ -82,8 +87,51 @@ static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) {
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfPoint", mstat.totalPoints), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "totalStorage", mstat.totalStorage), &lino, _OVER);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "compStorage", mstat.compStorage), &lino, _OVER);
pFcList = taosArrayInit(4, sizeof(SAnodeAlgo));
pAdList = taosArrayInit(4, sizeof(SAnodeAlgo));
if (pFcList == NULL || pAdList == NULL) {
lino = __LINE__;
goto _OVER;
}
mndRetrieveAlgoList(pMnode, pFcList, pAdList);
SJson *items = tjsonAddArrayToObject(pJson, "forecast");
TSDB_CHECK_NULL(items, code, lino, _OVER, terrno);
for(int32_t i = 0; i < taosArrayGetSize(pFcList); ++i) {
SJson *item = tjsonCreateObject();
TSDB_CHECK_NULL(item, code, lino, _OVER, terrno);
TAOS_CHECK_GOTO(tjsonAddItemToArray(items, item), &lino, _OVER);
SAnodeAlgo *p = taosArrayGet(pFcList, i);
TSDB_CHECK_NULL(p, code, lino, _OVER, terrno);
TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER);
}
SJson *items1 = tjsonAddArrayToObject(pJson, "anomaly_detection");
TSDB_CHECK_NULL(items1, code, lino, _OVER, terrno);
for(int32_t i = 0; i < taosArrayGetSize(pAdList); ++i) {
SJson *item = tjsonCreateObject();
TSDB_CHECK_NULL(item, code, lino, _OVER, terrno);
TAOS_CHECK_GOTO(tjsonAddItemToArray(items1, item), &lino, _OVER);
SAnodeAlgo *p = taosArrayGet(pAdList, i);
TSDB_CHECK_NULL(p, code, lino, _OVER, terrno);
TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER);
}
_OVER:
if (code != 0) mError("failed to mndBuildRuntimeInfo at line:%d since %s", lino, tstrerror(code));
taosArrayDestroy(pFcList);
taosArrayDestroy(pAdList);
if (code != 0) {
mError("failed to mndBuildRuntimeInfo at line:%d since %s", lino, tstrerror(code));
}
}
static char* mndBuildTelemetryReport(SMnode* pMnode) {
@ -136,21 +184,24 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
int32_t line = 0;
SMnode* pMnode = pReq->info.node;
STelemMgmt* pMgmt = &pMnode->telemMgmt;
if (!tsEnableTelem) return 0;
if (!tsEnableTelem) {
return 0;
}
(void)taosThreadMutexLock(&pMgmt->lock);
char* pCont = mndBuildTelemetryReport(pMnode);
(void)taosThreadMutexUnlock(&pMgmt->lock);
if (pCont == NULL) {
return 0;
}
TSDB_CHECK_NULL(pCont, code, line, _end, terrno);
code = taosSendTelemReport(&pMgmt->addrMgt, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT);
taosMemoryFree(pCont);
return code;
_end:
if (code != 0) {
mError("%s failed to send at line %d since %s", __func__, line, tstrerror(code));
mError("%s failed to send telemetry report, line %d since %s", __func__, line, tstrerror(code));
}
taosMemoryFree(pCont);
return code;
@ -161,15 +212,17 @@ int32_t mndInitTelem(SMnode* pMnode) {
STelemMgmt* pMgmt = &pMnode->telemMgmt;
(void)taosThreadMutexInit(&pMgmt->lock, NULL);
if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0)
if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0) {
mWarn("failed to get email since %s", tstrerror(code));
}
code = taosTelemetryMgtInit(&pMgmt->addrMgt, tsTelemServer);
if (code != 0) {
mError("failed to init telemetry management since %s", tstrerror(code));
return code;
}
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
return 0;
}

View File

@ -17,10 +17,6 @@ else()
MESSAGE(STATUS "enable assert core")
endif(${ASSERT_NOT_CORE})
if(${BUILD_WITH_ANALYSIS})
add_definitions(-DUSE_ANALYTICS)
endif()
target_include_directories(
util
PUBLIC "${TD_SOURCE_DIR}/include/util"