From 82d2b4ddb253340e841b78c5a2b74efd961c0470 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Nov 2024 16:28:47 +0800 Subject: [PATCH 1/3] refactor: return the error code and do some refactor. --- .../libs/executor/src/anomalywindowoperator.c | 17 ++++++++++++----- source/util/src/tanal.c | 3 +++ source/util/src/terror.c | 4 ++-- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index d03e527c2b..6adfbc6c77 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -55,7 +55,7 @@ typedef struct { static void anomalyDestroyOperatorInfo(void* param); static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); -static void anomalyAggregateBlocks(SOperatorInfo* pOperator); +static int32_t anomalyAggregateBlocks(SOperatorInfo* pOperator); static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pBlock); int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, @@ -78,6 +78,7 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p code = TSDB_CODE_ANAL_ALGO_NOT_FOUND; goto _error; } + if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName); code = TSDB_CODE_ANAL_ALGO_NOT_LOAD; @@ -198,7 +199,9 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_CODE(code, lino, _end); } else { qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks); - anomalyAggregateBlocks(pOperator); + code = anomalyAggregateBlocks(pOperator); + QUERY_CHECK_CODE(code, lino, _end); + pSupp->groupId = pBlock->info.id.groupId; numOfBlocks = 1; pSupp->cachedRows = pBlock->info.rows; @@ -217,7 +220,7 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe if (numOfBlocks > 0) { qDebug("group:%" PRId64 ", read finish, blocks:%d", pInfo->anomalySup.groupId, numOfBlocks); - anomalyAggregateBlocks(pOperator); + code = anomalyAggregateBlocks(pOperator); } int64_t cost = taosGetTimestampUs() - st; @@ -229,6 +232,7 @@ _end: pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } + (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; return code; } @@ -338,8 +342,8 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { SAnalBuf analBuf = {.bufType = ANAL_BUF_TYPE_JSON}; char dataBuf[64] = {0}; int32_t code = 0; + int64_t ts = 0; - int64_t ts = 0; // int64_t ts = taosGetTimestampMs(); snprintf(analBuf.fileName, sizeof(analBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts, pSupp->groupId); @@ -431,6 +435,7 @@ _OVER: if (code != 0) { qError("failed to analysis window since %s", tstrerror(code)); } + taosAnalBufDestroy(&analBuf); if (pJson != NULL) tjsonDelete(pJson); return code; @@ -473,7 +478,7 @@ static int32_t anomalyBuildResult(SOperatorInfo* pOperator) { return code; } -static void anomalyAggregateBlocks(SOperatorInfo* pOperator) { +static int32_t anomalyAggregateBlocks(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SAnomalyWindowOperatorInfo* pInfo = pOperator->info; @@ -623,6 +628,8 @@ _OVER: pSupp->curWin.ekey = 0; pSupp->curWin.skey = 0; pSupp->curWinIndex = 0; + + return code; } #else diff --git a/source/util/src/tanal.c b/source/util/src/tanal.c index 92eee28ba8..bb46a48c42 100644 --- a/source/util/src/tanal.c +++ b/source/util/src/tanal.c @@ -167,6 +167,8 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, char name[TSDB_ANAL_ALGO_KEY_LEN] = {0}; int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName); + char *unused = strntolower(name, name, nameLen); + if (taosThreadMutexLock(&tsAlgos.lock) == 0) { SAnalUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); if (pUrl != NULL) { @@ -178,6 +180,7 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, code = terrno; uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type)); } + if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { uError("failed to unlock hash"); return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d660edd0b8..ce209cc718 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -360,8 +360,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO, "Anode too many algori TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME, "Anode too long algorithm name") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE, "Anode too many algorithm type") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis url response is NULL") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis url can't access") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis service response is NULL") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis service can't access") TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm not found") TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded") TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type") From ebc6a9895d3e5be68da25b0cca0ac247981374d2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Nov 2024 16:44:43 +0800 Subject: [PATCH 2/3] refactor: do some internal refacrtor. --- include/common/{tanal.h => tanalytics.h} | 10 +++---- source/common/src/tmsg.c | 12 ++++---- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 4 +-- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 4 +-- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/dnode/mnode/impl/CMakeLists.txt | 2 +- source/dnode/mnode/impl/src/mndAnode.c | 10 +++---- source/libs/executor/CMakeLists.txt | 2 +- .../libs/executor/src/anomalywindowoperator.c | 4 +-- source/libs/executor/src/forecastoperator.c | 4 +-- source/libs/function/src/builtins.c | 2 +- source/libs/function/src/builtinsimpl.c | 2 +- source/libs/parser/src/parTranslater.c | 2 +- source/util/CMakeLists.txt | 2 +- source/util/src/{tanal.c => tanalytics.c} | 29 +++++++++---------- 16 files changed, 46 insertions(+), 47 deletions(-) rename include/common/{tanal.h => tanalytics.h} (96%) rename source/util/src/{tanal.c => tanalytics.c} (97%) diff --git a/include/common/tanal.h b/include/common/tanalytics.h similarity index 96% rename from include/common/tanal.h rename to include/common/tanalytics.h index 69d110d161..85eb963129 100644 --- a/include/common/tanal.h +++ b/include/common/tanalytics.h @@ -36,7 +36,7 @@ typedef struct { int32_t anode; int32_t urlLen; char *url; -} SAnalUrl; +} SAnalyticsUrl; typedef enum { ANAL_BUF_TYPE_JSON = 0, @@ -53,18 +53,18 @@ typedef struct { TdFilePtr filePtr; char fileName[TSDB_FILENAME_LEN + 10]; int64_t numOfRows; -} SAnalColBuf; +} SAnalyticsColBuf; typedef struct { EAnalBufType bufType; TdFilePtr filePtr; char fileName[TSDB_FILENAME_LEN]; int32_t numOfCols; - SAnalColBuf *pCols; + SAnalyticsColBuf *pCols; } SAnalBuf; -int32_t taosAnalInit(); -void taosAnalCleanup(); +int32_t taosAnalyticsInit(); +void taosAnalyticsCleanup(); SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf); int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 458badc764..6d1699b911 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -40,7 +40,7 @@ #define TD_MSG_RANGE_CODE_ #include "tmsgdef.h" -#include "tanal.h" +#include "tanalytics.h" #include "tcol.h" #include "tlog.h" @@ -2166,7 +2166,7 @@ int32_t tSerializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAl int32_t numOfAlgos = 0; void *pIter = taosHashIterate(pRsp->hash, NULL); while (pIter != NULL) { - SAnalUrl *pUrl = pIter; + SAnalyticsUrl *pUrl = pIter; size_t nameLen = 0; const char *name = taosHashGetKey(pIter, &nameLen); if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_KEY_LEN && pUrl->urlLen > 0) { @@ -2181,7 +2181,7 @@ int32_t tSerializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAl pIter = taosHashIterate(pRsp->hash, NULL); while (pIter != NULL) { - SAnalUrl *pUrl = pIter; + SAnalyticsUrl *pUrl = pIter; size_t nameLen = 0; const char *name = taosHashGetKey(pIter, &nameLen); if (nameLen > 0 && pUrl->urlLen > 0) { @@ -2225,7 +2225,7 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal int32_t nameLen; int32_t type; char name[TSDB_ANAL_ALGO_KEY_LEN]; - SAnalUrl url = {0}; + SAnalyticsUrl url = {0}; TAOS_CHECK_EXIT(tStartDecode(&decoder)); TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver)); @@ -2245,7 +2245,7 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&url.url, NULL) < 0); } - TAOS_CHECK_EXIT(taosHashPut(pRsp->hash, name, nameLen, &url, sizeof(SAnalUrl))); + TAOS_CHECK_EXIT(taosHashPut(pRsp->hash, name, nameLen, &url, sizeof(SAnalyticsUrl))); } tEndDecode(&decoder); @@ -2258,7 +2258,7 @@ _exit: void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp *pRsp) { void *pIter = taosHashIterate(pRsp->hash, NULL); while (pIter != NULL) { - SAnalUrl *pUrl = (SAnalUrl *)pIter; + SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter; taosMemoryFree(pUrl->url); pIter = taosHashIterate(pRsp->hash, pIter); } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index d6b792ca74..78cc35a62c 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -18,7 +18,7 @@ #include "dmInt.h" #include "monitor.h" #include "systable.h" -#include "tanal.h" +#include "tanalytics.h" #include "tchecksum.h" extern SConfig *tsCfg; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 04b4e9101c..fb7d891c67 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "dmInt.h" #include "libs/function/tudf.h" -#include "tanal.h" +#include "tanalytics.h" static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { int32_t code = 0; @@ -85,7 +85,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { dError("failed to start udfd since %s", tstrerror(code)); } - if ((code = taosAnalInit()) != 0) { + if ((code = taosAnalyticsInit()) != 0) { dError("failed to init analysis env since %s", tstrerror(code)); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 694cc52d64..6d4ebe424a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -21,7 +21,7 @@ #include "tgrant.h" #include "tcompare.h" #include "tcs.h" -#include "tanal.h" +#include "tanalytics.h" // clang-format on #define DM_INIT_AUDIT() \ @@ -209,7 +209,7 @@ void dmCleanup() { dError("failed to close udfc"); } udfStopUdfd(); - taosAnalCleanup(); + taosAnalyticsCleanup(); taosStopCacheRefreshWorker(); (void)dmDiskClose(); DestroyRegexCache(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 5a276de251..61543e619e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" #include "qworker.h" -#include "tanal.h" +#include "tanalytics.h" #include "tversion.h" static inline void dmSendRsp(SRpcMsg *pMsg) { diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 8a390948ae..ad36d8c8ae 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -18,7 +18,7 @@ if(TD_ENTERPRISE) endif() if(${BUILD_WITH_ANALYSIS}) - add_definitions(-DUSE_ANAL) + add_definitions(-DUSE_ANALYTICS) endif() endif() diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index 17e3e84c81..87bfe9f7af 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -21,10 +21,10 @@ #include "mndShow.h" #include "mndTrans.h" #include "mndUser.h" -#include "tanal.h" +#include "tanalytics.h" #include "tjson.h" -#ifdef USE_ANAL +#ifdef USE_ANALYTICS #define TSDB_ANODE_VER_NUMBER 1 #define TSDB_ANODE_RESERVE_SIZE 64 @@ -806,7 +806,7 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) { SSdb *pSdb = pMnode->pSdb; int32_t code = -1; SAnodeObj *pObj = NULL; - SAnalUrl url; + SAnalyticsUrl url; int32_t nameLen; char name[TSDB_ANAL_ALGO_KEY_LEN]; SRetrieveAnalAlgoReq req = {0}; @@ -838,7 +838,7 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) { SAnodeAlgo *algo = taosArrayGet(algos, a); nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", url.type, algo->name); - SAnalUrl *pOldUrl = taosHashAcquire(rsp.hash, name, nameLen); + SAnalyticsUrl *pOldUrl = taosHashAcquire(rsp.hash, name, nameLen); if (pOldUrl == NULL || (pOldUrl != NULL && pOldUrl->anode < url.anode)) { if (pOldUrl != NULL) { taosMemoryFreeClear(pOldUrl->url); @@ -855,7 +855,7 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) { url.urlLen = 1 + tsnprintf(url.url, TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN, "%s/%s", pAnode->url, taosAnalAlgoUrlStr(url.type)); - if (taosHashPut(rsp.hash, name, nameLen, &url, sizeof(SAnalUrl)) != 0) { + if (taosHashPut(rsp.hash, name, nameLen, &url, sizeof(SAnalyticsUrl)) != 0) { taosMemoryFree(url.url); sdbRelease(pSdb, pAnode); goto _OVER; diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index 014b538375..9a49076b6b 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -7,7 +7,7 @@ if(${TD_DARWIN}) endif(${TD_DARWIN}) if(${BUILD_WITH_ANALYSIS}) - add_definitions(-DUSE_ANAL) + add_definitions(-DUSE_ANALYTICS) endif() target_link_libraries(executor diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 6adfbc6c77..94cc5d9129 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -19,14 +19,14 @@ #include "functionMgt.h" #include "operator.h" #include "querytask.h" -#include "tanal.h" +#include "tanalytics.h" #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" #include "tjson.h" #include "ttime.h" -#ifdef USE_ANAL +#ifdef USE_ANALYTICS typedef struct { SArray* blocks; // SSDataBlock* diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 0afa933ee8..20dc9e28ba 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -19,14 +19,14 @@ #include "operator.h" #include "querytask.h" #include "storageapi.h" -#include "tanal.h" +#include "tanalytics.h" #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" #include "tfill.h" #include "ttime.h" -#ifdef USE_ANAL +#ifdef USE_ANALYTICS typedef struct { char algoName[TSDB_ANAL_ALGO_NAME_LEN]; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 58f20cc398..6a3a8a2089 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -19,7 +19,7 @@ #include "geomFunc.h" #include "querynodes.h" #include "scalar.h" -#include "tanal.h" +#include "tanalytics.h" #include "taoserror.h" #include "ttime.h" diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 983fccac1e..9f8a187d95 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -19,7 +19,7 @@ #include "functionResInfoInt.h" #include "query.h" #include "querynodes.h" -#include "tanal.h" +#include "tanalytics.h" #include "tcompare.h" #include "tdatablock.h" #include "tdigest.h" diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 7aada2be12..868f16946d 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -24,7 +24,7 @@ #include "parUtil.h" #include "scalar.h" #include "systable.h" -#include "tanal.h" +#include "tanalytics.h" #include "tcol.h" #include "tglobal.h" #include "ttime.h" diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index 7f5955f3dd..2633bb3268 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -18,7 +18,7 @@ else() endif(${ASSERT_NOT_CORE}) if(${BUILD_WITH_ANALYSIS}) - add_definitions(-DUSE_ANAL) + add_definitions(-DUSE_ANALYTICS) endif() target_include_directories( diff --git a/source/util/src/tanal.c b/source/util/src/tanalytics.c similarity index 97% rename from source/util/src/tanal.c rename to source/util/src/tanalytics.c index bb46a48c42..99d91700a2 100644 --- a/source/util/src/tanal.c +++ b/source/util/src/tanalytics.c @@ -14,18 +14,17 @@ */ #define _DEFAULT_SOURCE -#include "tanal.h" -#include "tmsg.h" +#include "tanalytics.h" #include "ttypes.h" #include "tutil.h" -#ifdef USE_ANAL +#ifdef USE_ANALYTICS #include #define ANAL_ALGO_SPLIT "," typedef struct { int64_t ver; - SHashObj *hash; // algoname:algotype -> SAnalUrl + SHashObj *hash; // algoname:algotype -> SAnalyticsUrl TdThreadMutex lock; } SAlgoMgmt; @@ -69,7 +68,7 @@ EAnalAlgoType taosAnalAlgoInt(const char *name) { return ANAL_ALGO_TYPE_END; } -int32_t taosAnalInit() { +int32_t taosAnalyticsInit() { if (curl_global_init(CURL_GLOBAL_ALL) != 0) { uError("failed to init curl"); return -1; @@ -94,14 +93,14 @@ int32_t taosAnalInit() { static void taosAnalFreeHash(SHashObj *hash) { void *pIter = taosHashIterate(hash, NULL); while (pIter != NULL) { - SAnalUrl *pUrl = (SAnalUrl *)pIter; + SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter; taosMemoryFree(pUrl->url); pIter = taosHashIterate(hash, pIter); } taosHashCleanup(hash); } -void taosAnalCleanup() { +void taosAnalyticsCleanup() { curl_global_cleanup(); if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) { uError("failed to destroy anal lock"); @@ -170,7 +169,7 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, char *unused = strntolower(name, name, nameLen); if (taosThreadMutexLock(&tsAlgos.lock) == 0) { - SAnalUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); + SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); if (pUrl != NULL) { tstrncpy(url, pUrl->url, urlLen); uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url); @@ -406,7 +405,7 @@ static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { return terrno; } - pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalColBuf)); + pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalyticsColBuf)); if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY; pBuf->numOfCols = numOfCols; @@ -415,7 +414,7 @@ static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { } for (int32_t i = 0; i < numOfCols; ++i) { - SAnalColBuf *pCol = &pBuf->pCols[i]; + SAnalyticsColBuf *pCol = &pBuf->pCols[i]; snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i); pCol->filePtr = taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); @@ -549,7 +548,7 @@ static int32_t taosAnalJsonBufWriteDataEnd(SAnalBuf *pBuf) { if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { for (int32_t i = 0; i < pBuf->numOfCols; ++i) { - SAnalColBuf *pCol = &pBuf->pCols[i]; + SAnalyticsColBuf *pCol = &pBuf->pCols[i]; code = taosFsyncFile(pCol->filePtr); if (code != 0) return code; @@ -591,7 +590,7 @@ int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) { if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { for (int32_t i = 0; i < pBuf->numOfCols; ++i) { - SAnalColBuf *pCol = &pBuf->pCols[i]; + SAnalyticsColBuf *pCol = &pBuf->pCols[i]; if (pCol->filePtr != NULL) { code = taosFsyncFile(pCol->filePtr); if (code != 0) return code; @@ -613,7 +612,7 @@ void taosAnalBufDestroy(SAnalBuf *pBuf) { if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { for (int32_t i = 0; i < pBuf->numOfCols; ++i) { - SAnalColBuf *pCol = &pBuf->pCols[i]; + SAnalyticsColBuf *pCol = &pBuf->pCols[i]; if (pCol->fileName[0] != 0) { if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr); if (taosRemoveFile(pCol->fileName) != 0) { @@ -729,8 +728,8 @@ static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContL #else -int32_t taosAnalInit() { return 0; } -void taosAnalCleanup() {} +int32_t taosAnalyticsInit() { return 0; } +void taosAnalyticsCleanup() {} SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { return NULL; } int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } From 3b129f7e361503fd6e6d32cf20fe9266ec760fa1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Nov 2024 13:46:23 +0800 Subject: [PATCH 3/3] fix(stream): alwasy return success for resume req. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index f31dd28847..326e8d4ada 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1186,10 +1186,12 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m streamMutexUnlock(&pHTask->lock); code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode); + tqDebug("s-task:%s resume complete, code:%s", pHTask->id.idStr, tstrerror(code)); + streamMetaReleaseTask(pMeta, pHTask); } - return code; + return TSDB_CODE_SUCCESS; } int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }