From daedcc53fb2b84b5a9acafffb0486cb3b84dd952 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Nov 2024 16:28:47 +0800 Subject: [PATCH] refactor: return the error code and do some refactor. --- .../libs/executor/src/anomalywindowoperator.c | 17 ++++++++++++----- source/util/src/tanal.c | 3 +++ source/util/src/terror.c | 4 ++-- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index d03e527c2b..6adfbc6c77 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -55,7 +55,7 @@ typedef struct { static void anomalyDestroyOperatorInfo(void* param); static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); -static void anomalyAggregateBlocks(SOperatorInfo* pOperator); +static int32_t anomalyAggregateBlocks(SOperatorInfo* pOperator); static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pBlock); int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, @@ -78,6 +78,7 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p code = TSDB_CODE_ANAL_ALGO_NOT_FOUND; goto _error; } + if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName); code = TSDB_CODE_ANAL_ALGO_NOT_LOAD; @@ -198,7 +199,9 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_CODE(code, lino, _end); } else { qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks); - anomalyAggregateBlocks(pOperator); + code = anomalyAggregateBlocks(pOperator); + QUERY_CHECK_CODE(code, lino, _end); + pSupp->groupId = pBlock->info.id.groupId; numOfBlocks = 1; pSupp->cachedRows = pBlock->info.rows; @@ -217,7 +220,7 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe if (numOfBlocks > 0) { qDebug("group:%" PRId64 ", read finish, blocks:%d", pInfo->anomalySup.groupId, numOfBlocks); - anomalyAggregateBlocks(pOperator); + code = anomalyAggregateBlocks(pOperator); } int64_t cost = taosGetTimestampUs() - st; @@ -229,6 +232,7 @@ _end: pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } + (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; return code; } @@ -338,8 +342,8 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { SAnalBuf analBuf = {.bufType = ANAL_BUF_TYPE_JSON}; char dataBuf[64] = {0}; int32_t code = 0; + int64_t ts = 0; - int64_t ts = 0; // int64_t ts = taosGetTimestampMs(); snprintf(analBuf.fileName, sizeof(analBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts, pSupp->groupId); @@ -431,6 +435,7 @@ _OVER: if (code != 0) { qError("failed to analysis window since %s", tstrerror(code)); } + taosAnalBufDestroy(&analBuf); if (pJson != NULL) tjsonDelete(pJson); return code; @@ -473,7 +478,7 @@ static int32_t anomalyBuildResult(SOperatorInfo* pOperator) { return code; } -static void anomalyAggregateBlocks(SOperatorInfo* pOperator) { +static int32_t anomalyAggregateBlocks(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SAnomalyWindowOperatorInfo* pInfo = pOperator->info; @@ -623,6 +628,8 @@ _OVER: pSupp->curWin.ekey = 0; pSupp->curWin.skey = 0; pSupp->curWinIndex = 0; + + return code; } #else diff --git a/source/util/src/tanal.c b/source/util/src/tanal.c index 92eee28ba8..bb46a48c42 100644 --- a/source/util/src/tanal.c +++ b/source/util/src/tanal.c @@ -167,6 +167,8 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, char name[TSDB_ANAL_ALGO_KEY_LEN] = {0}; int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName); + char *unused = strntolower(name, name, nameLen); + if (taosThreadMutexLock(&tsAlgos.lock) == 0) { SAnalUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); if (pUrl != NULL) { @@ -178,6 +180,7 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, code = terrno; uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type)); } + if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { uError("failed to unlock hash"); return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d660edd0b8..ce209cc718 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -360,8 +360,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO, "Anode too many algori TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME, "Anode too long algorithm name") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE, "Anode too many algorithm type") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis url response is NULL") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis url can't access") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis service response is NULL") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis service can't access") TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm not found") TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded") TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type")