refactor: do some internal refacrtor.

This commit is contained in:
Haojun Liao 2024-11-07 16:44:43 +08:00
parent daedcc53fb
commit a8f7f153d9
16 changed files with 46 additions and 47 deletions

View File

@ -36,7 +36,7 @@ typedef struct {
int32_t anode; int32_t anode;
int32_t urlLen; int32_t urlLen;
char *url; char *url;
} SAnalUrl; } SAnalyticsUrl;
typedef enum { typedef enum {
ANAL_BUF_TYPE_JSON = 0, ANAL_BUF_TYPE_JSON = 0,
@ -53,18 +53,18 @@ typedef struct {
TdFilePtr filePtr; TdFilePtr filePtr;
char fileName[TSDB_FILENAME_LEN + 10]; char fileName[TSDB_FILENAME_LEN + 10];
int64_t numOfRows; int64_t numOfRows;
} SAnalColBuf; } SAnalyticsColBuf;
typedef struct { typedef struct {
EAnalBufType bufType; EAnalBufType bufType;
TdFilePtr filePtr; TdFilePtr filePtr;
char fileName[TSDB_FILENAME_LEN]; char fileName[TSDB_FILENAME_LEN];
int32_t numOfCols; int32_t numOfCols;
SAnalColBuf *pCols; SAnalyticsColBuf *pCols;
} SAnalBuf; } SAnalBuf;
int32_t taosAnalInit(); int32_t taosAnalyticsInit();
void taosAnalCleanup(); void taosAnalyticsCleanup();
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf); SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf);
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen); int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen);

View File

@ -40,7 +40,7 @@
#define TD_MSG_RANGE_CODE_ #define TD_MSG_RANGE_CODE_
#include "tmsgdef.h" #include "tmsgdef.h"
#include "tanal.h" #include "tanalytics.h"
#include "tcol.h" #include "tcol.h"
#include "tlog.h" #include "tlog.h"
@ -2166,7 +2166,7 @@ int32_t tSerializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAl
int32_t numOfAlgos = 0; int32_t numOfAlgos = 0;
void *pIter = taosHashIterate(pRsp->hash, NULL); void *pIter = taosHashIterate(pRsp->hash, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SAnalUrl *pUrl = pIter; SAnalyticsUrl *pUrl = pIter;
size_t nameLen = 0; size_t nameLen = 0;
const char *name = taosHashGetKey(pIter, &nameLen); const char *name = taosHashGetKey(pIter, &nameLen);
if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_KEY_LEN && pUrl->urlLen > 0) { if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_KEY_LEN && pUrl->urlLen > 0) {
@ -2181,7 +2181,7 @@ int32_t tSerializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAl
pIter = taosHashIterate(pRsp->hash, NULL); pIter = taosHashIterate(pRsp->hash, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SAnalUrl *pUrl = pIter; SAnalyticsUrl *pUrl = pIter;
size_t nameLen = 0; size_t nameLen = 0;
const char *name = taosHashGetKey(pIter, &nameLen); const char *name = taosHashGetKey(pIter, &nameLen);
if (nameLen > 0 && pUrl->urlLen > 0) { if (nameLen > 0 && pUrl->urlLen > 0) {
@ -2225,7 +2225,7 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal
int32_t nameLen; int32_t nameLen;
int32_t type; int32_t type;
char name[TSDB_ANAL_ALGO_KEY_LEN]; char name[TSDB_ANAL_ALGO_KEY_LEN];
SAnalUrl url = {0}; SAnalyticsUrl url = {0};
TAOS_CHECK_EXIT(tStartDecode(&decoder)); TAOS_CHECK_EXIT(tStartDecode(&decoder));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver)); TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver));
@ -2245,7 +2245,7 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&url.url, NULL) < 0); TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&url.url, NULL) < 0);
} }
TAOS_CHECK_EXIT(taosHashPut(pRsp->hash, name, nameLen, &url, sizeof(SAnalUrl))); TAOS_CHECK_EXIT(taosHashPut(pRsp->hash, name, nameLen, &url, sizeof(SAnalyticsUrl)));
} }
tEndDecode(&decoder); tEndDecode(&decoder);
@ -2258,7 +2258,7 @@ _exit:
void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp *pRsp) { void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp *pRsp) {
void *pIter = taosHashIterate(pRsp->hash, NULL); void *pIter = taosHashIterate(pRsp->hash, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SAnalUrl *pUrl = (SAnalUrl *)pIter; SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter;
taosMemoryFree(pUrl->url); taosMemoryFree(pUrl->url);
pIter = taosHashIterate(pRsp->hash, pIter); pIter = taosHashIterate(pRsp->hash, pIter);
} }

View File

@ -18,7 +18,7 @@
#include "dmInt.h" #include "dmInt.h"
#include "monitor.h" #include "monitor.h"
#include "systable.h" #include "systable.h"
#include "tanal.h" #include "tanalytics.h"
#include "tchecksum.h" #include "tchecksum.h"
extern SConfig *tsCfg; extern SConfig *tsCfg;

View File

@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmInt.h" #include "dmInt.h"
#include "libs/function/tudf.h" #include "libs/function/tudf.h"
#include "tanal.h" #include "tanalytics.h"
static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
@ -85,7 +85,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dError("failed to start udfd since %s", tstrerror(code)); dError("failed to start udfd since %s", tstrerror(code));
} }
if ((code = taosAnalInit()) != 0) { if ((code = taosAnalyticsInit()) != 0) {
dError("failed to init analysis env since %s", tstrerror(code)); dError("failed to init analysis env since %s", tstrerror(code));
} }

View File

@ -21,7 +21,7 @@
#include "tgrant.h" #include "tgrant.h"
#include "tcompare.h" #include "tcompare.h"
#include "tcs.h" #include "tcs.h"
#include "tanal.h" #include "tanalytics.h"
// clang-format on // clang-format on
#define DM_INIT_AUDIT() \ #define DM_INIT_AUDIT() \
@ -209,7 +209,7 @@ void dmCleanup() {
dError("failed to close udfc"); dError("failed to close udfc");
} }
udfStopUdfd(); udfStopUdfd();
taosAnalCleanup(); taosAnalyticsCleanup();
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
(void)dmDiskClose(); (void)dmDiskClose();
DestroyRegexCache(); DestroyRegexCache();

View File

@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmMgmt.h" #include "dmMgmt.h"
#include "qworker.h" #include "qworker.h"
#include "tanal.h" #include "tanalytics.h"
#include "tversion.h" #include "tversion.h"
static inline void dmSendRsp(SRpcMsg *pMsg) { static inline void dmSendRsp(SRpcMsg *pMsg) {

View File

@ -18,7 +18,7 @@ if(TD_ENTERPRISE)
endif() endif()
if(${BUILD_WITH_ANALYSIS}) if(${BUILD_WITH_ANALYSIS})
add_definitions(-DUSE_ANAL) add_definitions(-DUSE_ANALYTICS)
endif() endif()
endif() endif()

View File

@ -21,10 +21,10 @@
#include "mndShow.h" #include "mndShow.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "tanal.h" #include "tanalytics.h"
#include "tjson.h" #include "tjson.h"
#ifdef USE_ANAL #ifdef USE_ANALYTICS
#define TSDB_ANODE_VER_NUMBER 1 #define TSDB_ANODE_VER_NUMBER 1
#define TSDB_ANODE_RESERVE_SIZE 64 #define TSDB_ANODE_RESERVE_SIZE 64
@ -806,7 +806,7 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t code = -1; int32_t code = -1;
SAnodeObj *pObj = NULL; SAnodeObj *pObj = NULL;
SAnalUrl url; SAnalyticsUrl url;
int32_t nameLen; int32_t nameLen;
char name[TSDB_ANAL_ALGO_KEY_LEN]; char name[TSDB_ANAL_ALGO_KEY_LEN];
SRetrieveAnalAlgoReq req = {0}; SRetrieveAnalAlgoReq req = {0};
@ -838,7 +838,7 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) {
SAnodeAlgo *algo = taosArrayGet(algos, a); SAnodeAlgo *algo = taosArrayGet(algos, a);
nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", url.type, algo->name); nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", url.type, algo->name);
SAnalUrl *pOldUrl = taosHashAcquire(rsp.hash, name, nameLen); SAnalyticsUrl *pOldUrl = taosHashAcquire(rsp.hash, name, nameLen);
if (pOldUrl == NULL || (pOldUrl != NULL && pOldUrl->anode < url.anode)) { if (pOldUrl == NULL || (pOldUrl != NULL && pOldUrl->anode < url.anode)) {
if (pOldUrl != NULL) { if (pOldUrl != NULL) {
taosMemoryFreeClear(pOldUrl->url); taosMemoryFreeClear(pOldUrl->url);
@ -855,7 +855,7 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) {
url.urlLen = 1 + tsnprintf(url.url, TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN, "%s/%s", pAnode->url, url.urlLen = 1 + tsnprintf(url.url, TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN, "%s/%s", pAnode->url,
taosAnalAlgoUrlStr(url.type)); taosAnalAlgoUrlStr(url.type));
if (taosHashPut(rsp.hash, name, nameLen, &url, sizeof(SAnalUrl)) != 0) { if (taosHashPut(rsp.hash, name, nameLen, &url, sizeof(SAnalyticsUrl)) != 0) {
taosMemoryFree(url.url); taosMemoryFree(url.url);
sdbRelease(pSdb, pAnode); sdbRelease(pSdb, pAnode);
goto _OVER; goto _OVER;

View File

@ -7,7 +7,7 @@ if(${TD_DARWIN})
endif(${TD_DARWIN}) endif(${TD_DARWIN})
if(${BUILD_WITH_ANALYSIS}) if(${BUILD_WITH_ANALYSIS})
add_definitions(-DUSE_ANAL) add_definitions(-DUSE_ANALYTICS)
endif() endif()
target_link_libraries(executor target_link_libraries(executor

View File

@ -19,14 +19,14 @@
#include "functionMgt.h" #include "functionMgt.h"
#include "operator.h" #include "operator.h"
#include "querytask.h" #include "querytask.h"
#include "tanal.h" #include "tanalytics.h"
#include "tcommon.h" #include "tcommon.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tjson.h" #include "tjson.h"
#include "ttime.h" #include "ttime.h"
#ifdef USE_ANAL #ifdef USE_ANALYTICS
typedef struct { typedef struct {
SArray* blocks; // SSDataBlock* SArray* blocks; // SSDataBlock*

View File

@ -19,14 +19,14 @@
#include "operator.h" #include "operator.h"
#include "querytask.h" #include "querytask.h"
#include "storageapi.h" #include "storageapi.h"
#include "tanal.h" #include "tanalytics.h"
#include "tcommon.h" #include "tcommon.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tfill.h" #include "tfill.h"
#include "ttime.h" #include "ttime.h"
#ifdef USE_ANAL #ifdef USE_ANALYTICS
typedef struct { typedef struct {
char algoName[TSDB_ANAL_ALGO_NAME_LEN]; char algoName[TSDB_ANAL_ALGO_NAME_LEN];

View File

@ -19,7 +19,7 @@
#include "geomFunc.h" #include "geomFunc.h"
#include "querynodes.h" #include "querynodes.h"
#include "scalar.h" #include "scalar.h"
#include "tanal.h" #include "tanalytics.h"
#include "taoserror.h" #include "taoserror.h"
#include "ttime.h" #include "ttime.h"

View File

@ -19,7 +19,7 @@
#include "functionResInfoInt.h" #include "functionResInfoInt.h"
#include "query.h" #include "query.h"
#include "querynodes.h" #include "querynodes.h"
#include "tanal.h" #include "tanalytics.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdigest.h" #include "tdigest.h"

View File

@ -24,7 +24,7 @@
#include "parUtil.h" #include "parUtil.h"
#include "scalar.h" #include "scalar.h"
#include "systable.h" #include "systable.h"
#include "tanal.h" #include "tanalytics.h"
#include "tcol.h" #include "tcol.h"
#include "tglobal.h" #include "tglobal.h"
#include "ttime.h" #include "ttime.h"

View File

@ -18,7 +18,7 @@ else()
endif(${ASSERT_NOT_CORE}) endif(${ASSERT_NOT_CORE})
if(${BUILD_WITH_ANALYSIS}) if(${BUILD_WITH_ANALYSIS})
add_definitions(-DUSE_ANAL) add_definitions(-DUSE_ANALYTICS)
endif() endif()
target_include_directories( target_include_directories(

View File

@ -14,18 +14,17 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tanal.h" #include "tanalytics.h"
#include "tmsg.h"
#include "ttypes.h" #include "ttypes.h"
#include "tutil.h" #include "tutil.h"
#ifdef USE_ANAL #ifdef USE_ANALYTICS
#include <curl/curl.h> #include <curl/curl.h>
#define ANAL_ALGO_SPLIT "," #define ANAL_ALGO_SPLIT ","
typedef struct { typedef struct {
int64_t ver; int64_t ver;
SHashObj *hash; // algoname:algotype -> SAnalUrl SHashObj *hash; // algoname:algotype -> SAnalyticsUrl
TdThreadMutex lock; TdThreadMutex lock;
} SAlgoMgmt; } SAlgoMgmt;
@ -69,7 +68,7 @@ EAnalAlgoType taosAnalAlgoInt(const char *name) {
return ANAL_ALGO_TYPE_END; return ANAL_ALGO_TYPE_END;
} }
int32_t taosAnalInit() { int32_t taosAnalyticsInit() {
if (curl_global_init(CURL_GLOBAL_ALL) != 0) { if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
uError("failed to init curl"); uError("failed to init curl");
return -1; return -1;
@ -94,14 +93,14 @@ int32_t taosAnalInit() {
static void taosAnalFreeHash(SHashObj *hash) { static void taosAnalFreeHash(SHashObj *hash) {
void *pIter = taosHashIterate(hash, NULL); void *pIter = taosHashIterate(hash, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SAnalUrl *pUrl = (SAnalUrl *)pIter; SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter;
taosMemoryFree(pUrl->url); taosMemoryFree(pUrl->url);
pIter = taosHashIterate(hash, pIter); pIter = taosHashIterate(hash, pIter);
} }
taosHashCleanup(hash); taosHashCleanup(hash);
} }
void taosAnalCleanup() { void taosAnalyticsCleanup() {
curl_global_cleanup(); curl_global_cleanup();
if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) { if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) {
uError("failed to destroy anal lock"); uError("failed to destroy anal lock");
@ -170,7 +169,7 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url,
char *unused = strntolower(name, name, nameLen); char *unused = strntolower(name, name, nameLen);
if (taosThreadMutexLock(&tsAlgos.lock) == 0) { if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
SAnalUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
if (pUrl != NULL) { if (pUrl != NULL) {
tstrncpy(url, pUrl->url, urlLen); tstrncpy(url, pUrl->url, urlLen);
uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url); uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url);
@ -406,7 +405,7 @@ static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) {
return terrno; return terrno;
} }
pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalColBuf)); pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalyticsColBuf));
if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY;
pBuf->numOfCols = numOfCols; pBuf->numOfCols = numOfCols;
@ -415,7 +414,7 @@ static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) {
} }
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SAnalColBuf *pCol = &pBuf->pCols[i]; SAnalyticsColBuf *pCol = &pBuf->pCols[i];
snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i); snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i);
pCol->filePtr = pCol->filePtr =
taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
@ -549,7 +548,7 @@ static int32_t taosAnalJsonBufWriteDataEnd(SAnalBuf *pBuf) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
for (int32_t i = 0; i < pBuf->numOfCols; ++i) { for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
SAnalColBuf *pCol = &pBuf->pCols[i]; SAnalyticsColBuf *pCol = &pBuf->pCols[i];
code = taosFsyncFile(pCol->filePtr); code = taosFsyncFile(pCol->filePtr);
if (code != 0) return code; if (code != 0) return code;
@ -591,7 +590,7 @@ int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
for (int32_t i = 0; i < pBuf->numOfCols; ++i) { for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
SAnalColBuf *pCol = &pBuf->pCols[i]; SAnalyticsColBuf *pCol = &pBuf->pCols[i];
if (pCol->filePtr != NULL) { if (pCol->filePtr != NULL) {
code = taosFsyncFile(pCol->filePtr); code = taosFsyncFile(pCol->filePtr);
if (code != 0) return code; if (code != 0) return code;
@ -613,7 +612,7 @@ void taosAnalBufDestroy(SAnalBuf *pBuf) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
for (int32_t i = 0; i < pBuf->numOfCols; ++i) { for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
SAnalColBuf *pCol = &pBuf->pCols[i]; SAnalyticsColBuf *pCol = &pBuf->pCols[i];
if (pCol->fileName[0] != 0) { if (pCol->fileName[0] != 0) {
if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr); if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr);
if (taosRemoveFile(pCol->fileName) != 0) { if (taosRemoveFile(pCol->fileName) != 0) {
@ -729,8 +728,8 @@ static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContL
#else #else
int32_t taosAnalInit() { return 0; } int32_t taosAnalyticsInit() { return 0; }
void taosAnalCleanup() {} void taosAnalyticsCleanup() {}
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { return NULL; } SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { return NULL; }
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }