refactor: return the error code and do some refactor.
This commit is contained in:
parent
3034c22afd
commit
daedcc53fb
|
@ -55,7 +55,7 @@ typedef struct {
|
|||
|
||||
static void anomalyDestroyOperatorInfo(void* param);
|
||||
static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
|
||||
static void anomalyAggregateBlocks(SOperatorInfo* pOperator);
|
||||
static int32_t anomalyAggregateBlocks(SOperatorInfo* pOperator);
|
||||
static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
|
||||
|
||||
int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
|
||||
|
@ -78,6 +78,7 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
|
|||
code = TSDB_CODE_ANAL_ALGO_NOT_FOUND;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) {
|
||||
qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName);
|
||||
code = TSDB_CODE_ANAL_ALGO_NOT_LOAD;
|
||||
|
@ -198,7 +199,9 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else {
|
||||
qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks);
|
||||
anomalyAggregateBlocks(pOperator);
|
||||
code = anomalyAggregateBlocks(pOperator);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pSupp->groupId = pBlock->info.id.groupId;
|
||||
numOfBlocks = 1;
|
||||
pSupp->cachedRows = pBlock->info.rows;
|
||||
|
@ -217,7 +220,7 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
|
||||
if (numOfBlocks > 0) {
|
||||
qDebug("group:%" PRId64 ", read finish, blocks:%d", pInfo->anomalySup.groupId, numOfBlocks);
|
||||
anomalyAggregateBlocks(pOperator);
|
||||
code = anomalyAggregateBlocks(pOperator);
|
||||
}
|
||||
|
||||
int64_t cost = taosGetTimestampUs() - st;
|
||||
|
@ -229,6 +232,7 @@ _end:
|
|||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
(*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
|
||||
return code;
|
||||
}
|
||||
|
@ -338,8 +342,8 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
|
|||
SAnalBuf analBuf = {.bufType = ANAL_BUF_TYPE_JSON};
|
||||
char dataBuf[64] = {0};
|
||||
int32_t code = 0;
|
||||
int64_t ts = 0;
|
||||
|
||||
int64_t ts = 0;
|
||||
// int64_t ts = taosGetTimestampMs();
|
||||
snprintf(analBuf.fileName, sizeof(analBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts,
|
||||
pSupp->groupId);
|
||||
|
@ -431,6 +435,7 @@ _OVER:
|
|||
if (code != 0) {
|
||||
qError("failed to analysis window since %s", tstrerror(code));
|
||||
}
|
||||
|
||||
taosAnalBufDestroy(&analBuf);
|
||||
if (pJson != NULL) tjsonDelete(pJson);
|
||||
return code;
|
||||
|
@ -473,7 +478,7 @@ static int32_t anomalyBuildResult(SOperatorInfo* pOperator) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void anomalyAggregateBlocks(SOperatorInfo* pOperator) {
|
||||
static int32_t anomalyAggregateBlocks(SOperatorInfo* pOperator) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
|
||||
|
@ -623,6 +628,8 @@ _OVER:
|
|||
pSupp->curWin.ekey = 0;
|
||||
pSupp->curWin.skey = 0;
|
||||
pSupp->curWinIndex = 0;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
#else
|
||||
|
|
|
@ -167,6 +167,8 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url,
|
|||
char name[TSDB_ANAL_ALGO_KEY_LEN] = {0};
|
||||
int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName);
|
||||
|
||||
char *unused = strntolower(name, name, nameLen);
|
||||
|
||||
if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
|
||||
SAnalUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
|
||||
if (pUrl != NULL) {
|
||||
|
@ -178,6 +180,7 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url,
|
|||
code = terrno;
|
||||
uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type));
|
||||
}
|
||||
|
||||
if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
|
||||
uError("failed to unlock hash");
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -360,8 +360,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO, "Anode too many algori
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME, "Anode too long algorithm name")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE, "Anode too many algorithm type")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis url response is NULL")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis url can't access")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis service response is NULL")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis service can't access")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm not found")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type")
|
||||
|
|
Loading…
Reference in New Issue