From c8b44b82c4c6a5cdb50c4672b4413af4a98f0f0e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Dec 2024 15:27:23 +0800 Subject: [PATCH 1/8] refactor:do some internal refactor. --- include/common/tanalytics.h | 4 +-- source/libs/executor/src/forecastoperator.c | 33 +++++++++++++-------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/include/common/tanalytics.h b/include/common/tanalytics.h index d0af84ecfb..6ebdb38fa6 100644 --- a/include/common/tanalytics.h +++ b/include/common/tanalytics.h @@ -28,8 +28,8 @@ extern "C" { #define ANAL_FORECAST_DEFAULT_ROWS 10 #define ANAL_FORECAST_DEFAULT_CONF 95 #define ANAL_FORECAST_DEFAULT_WNCHECK 1 -#define ANAL_FORECAST_MAX_ROWS 10000 -#define ANAL_ANOMALY_WINDOW_MAX_ROWS 10000 +#define ANAL_FORECAST_MAX_ROWS 40000 +#define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000 typedef struct { EAnalAlgoType type; diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index a56b0dd214..2985e5e000 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -72,17 +72,20 @@ static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int return TSDB_CODE_SUCCESS; } -static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) { - if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) { - return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; - } - - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; +static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SAnalyticBuf* pBuf = &pSupp->analBuf; - qDebug("block:%d, %p rows:%" PRId64, pSupp->numOfBlocks, pBlock, pBlock->info.rows); + if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) { + code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; + qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows, + tstrerror(code), ANAL_FORECAST_MAX_ROWS); + return code; + } + pSupp->numOfBlocks++; + qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows); for (int32_t j = 0; j < pBlock->info.rows; ++j) { SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputValSlot); @@ -98,10 +101,16 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) { pSupp->numOfRows++; code = taosAnalBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts); - if (TSDB_CODE_SUCCESS != code) return code; + if (TSDB_CODE_SUCCESS != code) { + qError("%s failed to write ts in buf, code:%s", id, tstrerror(code)); + return code; + } code = taosAnalBufWriteColData(pBuf, 1, valType, val); - if (TSDB_CODE_SUCCESS != code) return code; + if (TSDB_CODE_SUCCESS != code) { + qError("%s failed to write val in buf, code:%s", id, tstrerror(code)); + return code; + } } return 0; @@ -394,7 +403,7 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pSupp->cachedRows += pBlock->info.rows; qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks, pBlock->info.rows, pSupp->cachedRows); - code = forecastCacheBlock(pSupp, pBlock); + code = forecastCacheBlock(pSupp, pBlock, pId); QUERY_CHECK_CODE(code, lino, _end); } else { qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks); @@ -405,7 +414,7 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pSupp->cachedRows = pBlock->info.rows; qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, pBlock->info.rows, pSupp->cachedRows); - code = forecastCacheBlock(pSupp, pBlock); + code = forecastCacheBlock(pSupp, pBlock, pId); QUERY_CHECK_CODE(code, lino, _end); } From 9198164ac661c5aaf0a21250f91bbc5af51a65f0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Dec 2024 16:26:17 +0800 Subject: [PATCH 2/8] fix(analytics): fix check return value error. --- source/util/src/tanalytics.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c index 68bbbb7e99..3812295381 100644 --- a/source/util/src/tanalytics.c +++ b/source/util/src/tanalytics.c @@ -478,11 +478,13 @@ static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf } if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + int32_t ret = taosWriteFile(pBuf->filePtr, buf, bufLen); + if (ret < 0) { return terrno; } } else { - if (taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen) != bufLen) { + int32_t ret = taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen); + if (ret < 0) { return terrno; } } From bf4da942e4e68f223fdceafd2af076030d4885e4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 20 Dec 2024 18:49:57 +0800 Subject: [PATCH 3/8] refactor(analytics): do some refactor. --- source/util/src/tanalytics.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c index 3812295381..306795f5ed 100644 --- a/source/util/src/tanalytics.c +++ b/source/util/src/tanalytics.c @@ -479,12 +479,12 @@ static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { int32_t ret = taosWriteFile(pBuf->filePtr, buf, bufLen); - if (ret < 0) { + if (ret != bufLen) { return terrno; } } else { int32_t ret = taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen); - if (ret < 0) { + if (ret != bufLen) { return terrno; } } From 9cb81284f62e0781155120b5dd71a87531b40dee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 23 Dec 2024 14:32:04 +0800 Subject: [PATCH 4/8] fix(analytics):copy the correct post rsp. --- source/util/src/tanalytics.c | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c index 306795f5ed..5a3ceef422 100644 --- a/source/util/src/tanalytics.c +++ b/source/util/src/tanalytics.c @@ -216,12 +216,28 @@ static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void return 0; } - pRsp->dataLen = (int64_t)contLen * (int64_t)nmemb; - pRsp->data = taosMemoryMalloc(pRsp->dataLen + 1); + int64_t size = pRsp->dataLen + (int64_t)contLen * nmemb; + if (pRsp->data == NULL) { + pRsp->data = taosMemoryMalloc(size + 1); + if (pRsp->data == NULL) { + uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); + return terrno; + } + } else { + char* p = taosMemoryRealloc(pRsp->data, size + 1); + if (p == NULL) { + uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); + return terrno; + } + + pRsp->data = p; + } if (pRsp->data != NULL) { - (void)memcpy(pRsp->data, pCont, pRsp->dataLen); + (void)memcpy(pRsp->data + pRsp->dataLen, pCont, pRsp->dataLen); pRsp->data[pRsp->dataLen] = 0; + pRsp->dataLen = size; + uDebugL("curl response is received, len:%" PRId64 ", content:%s", pRsp->dataLen, pRsp->data); return pRsp->dataLen; } else { From ac6c6278b73fcfa1e5739e84abbddc615f8c97ca Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 23 Dec 2024 16:41:50 +0800 Subject: [PATCH 5/8] fix(analytics): fix bugs in recv post results. --- source/util/src/tanalytics.c | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c index 5a3ceef422..e68edd4b76 100644 --- a/source/util/src/tanalytics.c +++ b/source/util/src/tanalytics.c @@ -216,30 +216,33 @@ static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void return 0; } - int64_t size = pRsp->dataLen + (int64_t)contLen * nmemb; + int64_t newDataSize = (int64_t) contLen * nmemb; + int64_t size = pRsp->dataLen + newDataSize; + if (pRsp->data == NULL) { pRsp->data = taosMemoryMalloc(size + 1); if (pRsp->data == NULL) { uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); - return terrno; + return 0; // return the recv length, if failed, return 0 } } else { char* p = taosMemoryRealloc(pRsp->data, size + 1); if (p == NULL) { uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); - return terrno; + return 0; // return the recv length, if failed, return 0 } pRsp->data = p; } if (pRsp->data != NULL) { - (void)memcpy(pRsp->data + pRsp->dataLen, pCont, pRsp->dataLen); - pRsp->data[pRsp->dataLen] = 0; - pRsp->dataLen = size; + (void)memcpy(pRsp->data + pRsp->dataLen, pCont, newDataSize); - uDebugL("curl response is received, len:%" PRId64 ", content:%s", pRsp->dataLen, pRsp->data); - return pRsp->dataLen; + pRsp->dataLen = size; + pRsp->data[size] = 0; + + uDebugL("curl response is received, len:%" PRId64 ", content:%s", size, pRsp->data); + return newDataSize; } else { pRsp->dataLen = 0; uError("failed to malloc curl response"); From 44db20492c558b9d3ec165d2c20f07a88ad7192c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 26 Dec 2024 10:42:34 +0800 Subject: [PATCH 6/8] fix(stream): check error and continue computing, fix memory leak. --- source/libs/stream/src/streamDispatch.c | 23 +++++++++++------------ source/libs/stream/src/streamExec.c | 3 +-- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b4fcf1edc9..722ab2018e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -954,7 +954,6 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n int32_t vgId = pTask->pMeta->vgId; if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit", id, vgId, pTmrInfo->launchChkptId); @@ -963,13 +962,11 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr", id, vgId); return -1; } if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id, vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num); return -1; @@ -998,6 +995,7 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId); if (p == NULL) { stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); + taosArrayDestroy(pTmp); return terrno; } else { stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level, @@ -1047,13 +1045,13 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t } } -static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* pNotRspList) { +static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray** pNotRspList) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; SArray* pList = pActiveInfo->pReadyMsgList; int32_t num = taosArrayGetSize(pList); int32_t vgId = pTask->pMeta->vgId; - int32_t checkpointId = pActiveInfo->activeId; + int64_t checkpointId = pActiveInfo->activeId; const char* id = pTask->id.idStr; int32_t notRsp = 0; @@ -1062,18 +1060,17 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* return code; } - code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); + code = doFindNotConfirmUpstream(pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); if (code) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr", id, tstrerror(code)); return code; } - notRsp = taosArrayGetSize(pNotRspList); + notRsp = taosArrayGetSize(*pNotRspList); if (notRsp == 0) { streamClearChkptReadyMsg(pActiveInfo); } else { - doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); + doSendChkptReadyMsg(pTask, *pNotRspList, checkpointId, pList); } return code; @@ -1137,10 +1134,12 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { } streamMutexLock(&pActiveInfo->lock); - code = chkptReadyMsgSendHelper(pTask, param, pNotRspList); + code = chkptReadyMsgSendHelper(pTask, param, &pNotRspList); streamMutexUnlock(&pActiveInfo->lock); if (code != TSDB_CODE_SUCCESS) { + streamCleanBeforeQuitTmr(pTmrInfo, param); + streamMetaReleaseTask(pTask->pMeta, pTask); taosArrayDestroy(pNotRspList); return; @@ -1176,7 +1175,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pList); if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num, + stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d not send chkpt-ready msg", id, num, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList)); streamMutexUnlock(&pActiveInfo->lock); return TSDB_CODE_STREAM_INTERNAL_ERROR; @@ -1200,7 +1199,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); } } else { - stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); + stError("s-task:%s failed to prepare the checkpoint-ready msg, try next time in 10s", id); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 85f287f301..89cb4153fe 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -915,8 +915,7 @@ int32_t streamResumeTask(SStreamTask* pTask) { while (1) { code = doStreamExecTask(pTask); if (code) { - stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code)); - return code; + stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code)); } // check if continue streamMutexLock(&pTask->lock); From 388de2a2f1fdb23a0682e3d00932e65259fc6a3b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 26 Dec 2024 17:24:17 +0800 Subject: [PATCH 7/8] test(analytics): add tests for tdanalytics. --- source/util/src/tanalytics.c | 6 +-- tests/script/sh/stop_dnodes.sh | 8 +-- tests/script/tsim/analytics/basic0.sim | 67 +++++++++++++++++++++++++- 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c index e68edd4b76..bf2cb4fd07 100644 --- a/source/util/src/tanalytics.c +++ b/source/util/src/tanalytics.c @@ -20,7 +20,7 @@ #ifdef USE_ANALYTICS #include -#define ANAL_ALGO_SPLIT "," +#define ANALYTICS_ALOG_SPLIT_CHAR "," typedef struct { int64_t ver; @@ -136,7 +136,7 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, return false; } - pEnd = strstr(pStart, ANAL_ALGO_SPLIT); + pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR); if (optMaxLen > 0) { if (pEnd > pStart) { int32_t len = (int32_t)(pEnd - pStart); @@ -168,7 +168,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); char *pos1 = strstr(option, buf); - char *pos2 = strstr(option, ANAL_ALGO_SPLIT); + char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR); if (pos1 != NULL) { *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10); return true; diff --git a/tests/script/sh/stop_dnodes.sh b/tests/script/sh/stop_dnodes.sh index 8923804547..da2083b013 100755 --- a/tests/script/sh/stop_dnodes.sh +++ b/tests/script/sh/stop_dnodes.sh @@ -13,7 +13,7 @@ if [ -n "$PID" ]; then systemctl stop taosd fi -PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` +PID=`ps -ef|grep -w taosd | grep -v grep | grep -v taosanode | awk '{print $2}'` while [ -n "$PID" ]; do echo kill -9 $PID #pkill -9 taosd @@ -38,10 +38,10 @@ while [ -n "$PID" ]; do else lsof -nti:6030 | xargs kill -9 fi - PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'` + PID=`ps -ef|grep -w taos | grep -v grep |grep -v taosanode| awk '{print $2}'` done -PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` +PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode|awk '{print $2}'` while [ -n "$PID" ]; do echo kill -9 $PID #pkill -9 tmq_sim @@ -52,5 +52,5 @@ while [ -n "$PID" ]; do else lsof -nti:6030 | xargs kill -9 fi - PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` + PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode| awk '{print $2}'` done \ No newline at end of file diff --git a/tests/script/tsim/analytics/basic0.sim b/tests/script/tsim/analytics/basic0.sim index 77c9184e8f..35774e7682 100644 --- a/tests/script/tsim/analytics/basic0.sim +++ b/tests/script/tsim/analytics/basic0.sim @@ -3,7 +3,17 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/exec.sh -n dnode1 -s start sql connect -print =============== create anode +print =============== failed to create anode on '127.0.0.1:1101' +sql_error create anode '127.0.0.1:1101' + +sql show anodes +if $rows != 0 then + return -1 +endi + +sql_error drop anode 1 + +print ================ create anode sql create anode '192.168.1.116:6050' sql show anodes @@ -58,6 +68,61 @@ if $data00 != 1 then return -1 endi +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1 +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=0') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=-10') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, every=0') from ct1 + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters, conf=50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, ' algo=holtwinters , conf=50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, ' algo = holtwinters , conf = 50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ,') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1 +sql select forecast(c1, 'conf=50 ,algo = arima') from ct1 +sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1 + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1 +if $rows != 10 then + return -1 +endi + +if $data03 != 28 then + return -1 +endi + +if $data00 != @23-11-15 06:13:20.000@ then + print expect 23-11-15 06:13:20.000 , actual $data00 + return -1 +endi + +if $data10 != @23-11-15 06:13:20.002@ then + print expect 23-11-15 06:13:20.002 , actual $data10 + return -1 +endi + +if $data20 != @23-11-15 06:13:20.004@ then + return -1 +endi + +print test the every option and rows option + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=100,rows=5') from ct1 +if $rows != 5 then + return -1 +endi + +if $data00 != @23-11-15 06:13:20.000@ then + return -1 +endi + +if $data10 != @23-11-15 06:13:20.100@ then + return -1 +endi sql drop anode 1 sql show anodes From 7c395a4fc361aa3e45f19e71e08bfe7664067d29 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 26 Dec 2024 18:17:02 +0800 Subject: [PATCH 8/8] test: add test case for anamaly detection in tdanalytics. --- .../libs/executor/src/anomalywindowoperator.c | 44 ++++++++++--------- tests/script/tsim/analytics/basic0.sim | 44 ++++++++++++++++--- 2 files changed, 63 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 3bc9c806b0..275416a2cc 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -61,9 +61,13 @@ static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + size_t keyBufSize = 0; + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + const char* id = GET_TASKID(pTaskInfo); - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode; @@ -74,13 +78,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p } if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) { - qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt); + qError("%s failed to get anomaly_window algorithm name from %s", id, pAnomalyNode->anomalyOpt); code = TSDB_CODE_ANA_ALGO_NOT_FOUND; goto _error; } if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { - qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName); + qError("%s failed to get anomaly_window algorithm url from %s", id, pInfo->algoName); code = TSDB_CODE_ANA_ALGO_NOT_LOAD; goto _error; } @@ -94,20 +98,18 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p SExprInfo* pScalarExprInfo = NULL; code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } - size_t keyBufSize = 0; - int32_t num = 0; - SExprInfo* pExprInfo = NULL; code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num); QUERY_CHECK_CODE(code, lino, _error); initResultSizeInfo(&pOperator->resultInfo, 4096); - code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, id, pTaskInfo->streamInfo.pState, + &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc); @@ -124,20 +126,19 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p pInfo->anomalyCol = extractColumnFromColumnNode(pColNode); pInfo->anomalyKey.type = pInfo->anomalyCol.type; pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes; + pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes); - if (pInfo->anomalyKey.pData == NULL) { - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalyKey.pData, code, lino, _error, terrno) int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes; pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize); - pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*)); - pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow)); + QUERY_CHECK_NULL(pInfo->anomalySup.pResultRow, code, lino, _error, terrno) - if (pInfo->anomalySup.windows == NULL || pInfo->anomalySup.blocks == NULL || pInfo->anomalySup.pResultRow == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } + pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*)); + QUERY_CHECK_NULL(pInfo->anomalySup.blocks, code, lino, _error, terrno) + + pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow)); + QUERY_CHECK_NULL(pInfo->anomalySup.windows, code, lino, _error, terrno) code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); @@ -155,18 +156,21 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p *pOptrInfo = pOperator; - qDebug("anomaly_window operator is created, algo:%s url:%s opt:%s", pInfo->algoName, pInfo->algoUrl, + qDebug("%s anomaly_window operator is created, algo:%s url:%s opt:%s", id, pInfo->algoName, pInfo->algoUrl, pInfo->anomalyOpt); return TSDB_CODE_SUCCESS; _error: + qError("%s failed to create anomaly_window operator, line:%d algo:%s code:%s", id, lino, pAnomalyNode->anomalyOpt, + tstrerror(code)); + if (pInfo != NULL) { anomalyDestroyOperatorInfo(pInfo); } destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; - qError("failed to create anomaly_window operator, algo:%s code:0x%x", pInfo->algoName, code); + return code; } diff --git a/tests/script/tsim/analytics/basic0.sim b/tests/script/tsim/analytics/basic0.sim index 35774e7682..3ac49b1fc3 100644 --- a/tests/script/tsim/analytics/basic0.sim +++ b/tests/script/tsim/analytics/basic0.sim @@ -40,7 +40,7 @@ print $data00 $data01 $data02 sql use d0 print =============== create super table, include column type for count/sum/min/max/first -sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned) +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 tinyint, c5 bigint, c6 varchar(12)) tags (t1 int unsigned) sql show stables if $rows != 1 then @@ -52,10 +52,11 @@ sql create table ct1 using stb tags(1000) print ==================== insert data # input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40] -sql insert into ct1(ts, c1) values(now-1a, 5)(now+1a, 14)(now+2a, 15)(now+3a, 15)(now+4a, 14) -sql insert into ct1(ts, c1) values(now+5a, 19)(now+6a, 17)(now+7a, 16)(now+8a, 20)(now+9a, 22) -sql insert into ct1(ts, c1) values(now+10a, 8)(now+11a, 21)(now+12a, 28)(now+13a, 11)(now+14a, 9) -sql insert into ct1(ts, c1) values(now+15a, 29)(now+16a, 40) +sql insert into ct1(ts, c1, c2, c3, c4, c5, c6) values(now-1a, 5, 5, 5, 5, 5, 'a')(now+1a, 14, 14, 14, 14, 14, 'a')(now+2a, 15, 15, 15, 15, 15, 'a') +sql insert into ct1 values(now+3a, 15, 15, 15, 15, 15, 'a')(now+4a, 14, 14, 14, 14, 14, 'a')(now+5a, 19, 19, 19, 19, 19, 'a')(now+6a, 17, 17, 17, 17, 17, 'a') +sql insert into ct1 values(now+7a, 16, 16, 16, 16, 16, 'a')(now+8a, 20, 20, 20, 20, 20, 'a')(now+9a, 22, 22, 22, 22, 22, 'a') +sql insert into ct1 values(now+10a, 8, 8, 8, 8, 8, 'a')(now+11a, 21, 21, 21, 21, 21, 'a')(now+12a, 28, 28, 28, 28, 28, 'a')(now+13a, 11, 11, 11, 11, 11, 'a')(now+14a, 9, 9, 9, 9, 9, 'a') +sql insert into ct1 values(now+15a, 29, 29, 29, 29, 29, 'a')(now+16a, 40, 40, 40, 40, 40, 'a') sql select count(*) from ct1 if $data00 != 17 then @@ -68,6 +69,25 @@ if $data00 != 1 then return -1 endi +print ================= try every loaded anomaly detection algorithm +sql select count(*) from ct1 anomaly_window(c1, 'algo=iqr'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=lof'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=shesd'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=grubbs'); + +print ================= try every column type of column +sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c2, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c3, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c4, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c5, 'algo=ksigma,k=2'); + +print =================== invalid column type +sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2'); +sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 + + sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1 sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1 @@ -83,8 +103,15 @@ sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ,' sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1 sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1 sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1 + +print =================== valid column type sql select forecast(c1, 'conf=50 ,algo = arima') from ct1 sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c2, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c3, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c4, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1 if $rows != 10 then @@ -131,6 +158,13 @@ if $rows != 0 then return -1 endi +sleep 1000 + +print ===================== query without anodes +sql_error select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 +sql_error select count(*) from ct1 anomaly_window(c1, 'algo=iqr'); + + _OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT print =============== check