test: add test case for anamaly detection in tdanalytics.
This commit is contained in:
parent
746cbc09c3
commit
98a3fa1ab5
|
@ -61,9 +61,13 @@ static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock*
|
||||||
int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
|
int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
|
||||||
SOperatorInfo** pOptrInfo) {
|
SOperatorInfo** pOptrInfo) {
|
||||||
QRY_PARAM_CHECK(pOptrInfo);
|
QRY_PARAM_CHECK(pOptrInfo);
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
size_t keyBufSize = 0;
|
||||||
|
int32_t num = 0;
|
||||||
|
SExprInfo* pExprInfo = NULL;
|
||||||
|
const char* id = GET_TASKID(pTaskInfo);
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
int32_t lino = 0;
|
|
||||||
SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo));
|
SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode;
|
SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode;
|
||||||
|
@ -74,13 +78,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) {
|
if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) {
|
||||||
qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt);
|
qError("%s failed to get anomaly_window algorithm name from %s", id, pAnomalyNode->anomalyOpt);
|
||||||
code = TSDB_CODE_ANA_ALGO_NOT_FOUND;
|
code = TSDB_CODE_ANA_ALGO_NOT_FOUND;
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) {
|
if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) {
|
||||||
qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName);
|
qError("%s failed to get anomaly_window algorithm url from %s", id, pInfo->algoName);
|
||||||
code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
|
code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -94,20 +98,18 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
|
||||||
SExprInfo* pScalarExprInfo = NULL;
|
SExprInfo* pScalarExprInfo = NULL;
|
||||||
code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
|
code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
|
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t keyBufSize = 0;
|
|
||||||
int32_t num = 0;
|
|
||||||
SExprInfo* pExprInfo = NULL;
|
|
||||||
code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num);
|
code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
|
|
||||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
|
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, id, pTaskInfo->streamInfo.pState,
|
||||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
&pTaskInfo->storageAPI.functionStore);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc);
|
||||||
|
@ -124,27 +126,19 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
|
||||||
pInfo->anomalyCol = extractColumnFromColumnNode(pColNode);
|
pInfo->anomalyCol = extractColumnFromColumnNode(pColNode);
|
||||||
pInfo->anomalyKey.type = pInfo->anomalyCol.type;
|
pInfo->anomalyKey.type = pInfo->anomalyCol.type;
|
||||||
pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes;
|
pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes;
|
||||||
|
|
||||||
pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes);
|
pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes);
|
||||||
if (pInfo->anomalyKey.pData == NULL) {
|
QUERY_CHECK_NULL(pInfo->anomalyKey.pData, code, lino, _error, terrno)
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes;
|
int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes;
|
||||||
pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize);
|
pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize);
|
||||||
if (pInfo->anomalySup.pResultRow == NULL) {
|
QUERY_CHECK_NULL(pInfo->anomalySup.pResultRow, code, lino, _error, terrno)
|
||||||
code = terrno;
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*));
|
pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*));
|
||||||
if (pInfo->anomalySup.blocks == NULL) {
|
QUERY_CHECK_NULL(pInfo->anomalySup.blocks, code, lino, _error, terrno)
|
||||||
code = terrno;
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow));
|
pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow));
|
||||||
if (pInfo->anomalySup.windows == NULL) {
|
QUERY_CHECK_NULL(pInfo->anomalySup.windows, code, lino, _error, terrno)
|
||||||
code = terrno;
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
@ -162,18 +156,21 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
|
||||||
|
|
||||||
*pOptrInfo = pOperator;
|
*pOptrInfo = pOperator;
|
||||||
|
|
||||||
qDebug("anomaly_window operator is created, algo:%s url:%s opt:%s", pInfo->algoName, pInfo->algoUrl,
|
qDebug("%s anomaly_window operator is created, algo:%s url:%s opt:%s", id, pInfo->algoName, pInfo->algoUrl,
|
||||||
pInfo->anomalyOpt);
|
pInfo->anomalyOpt);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
qError("%s failed to create anomaly_window operator, line:%d algo:%s code:%s", id, lino, pAnomalyNode->anomalyOpt,
|
||||||
|
tstrerror(code));
|
||||||
|
|
||||||
if (pInfo != NULL) {
|
if (pInfo != NULL) {
|
||||||
anomalyDestroyOperatorInfo(pInfo);
|
anomalyDestroyOperatorInfo(pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
|
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
qError("failed to create anomaly_window operator, algo:%s code:0x%x", pInfo->algoName, code);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ print $data00 $data01 $data02
|
||||||
sql use d0
|
sql use d0
|
||||||
|
|
||||||
print =============== create super table, include column type for count/sum/min/max/first
|
print =============== create super table, include column type for count/sum/min/max/first
|
||||||
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
|
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 tinyint, c5 bigint, c6 varchar(12)) tags (t1 int unsigned)
|
||||||
|
|
||||||
sql show stables
|
sql show stables
|
||||||
if $rows != 1 then
|
if $rows != 1 then
|
||||||
|
@ -52,10 +52,11 @@ sql create table ct1 using stb tags(1000)
|
||||||
|
|
||||||
print ==================== insert data
|
print ==================== insert data
|
||||||
# input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40]
|
# input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40]
|
||||||
sql insert into ct1(ts, c1) values(now-1a, 5)(now+1a, 14)(now+2a, 15)(now+3a, 15)(now+4a, 14)
|
sql insert into ct1(ts, c1, c2, c3, c4, c5, c6) values(now-1a, 5, 5, 5, 5, 5, 'a')(now+1a, 14, 14, 14, 14, 14, 'a')(now+2a, 15, 15, 15, 15, 15, 'a')
|
||||||
sql insert into ct1(ts, c1) values(now+5a, 19)(now+6a, 17)(now+7a, 16)(now+8a, 20)(now+9a, 22)
|
sql insert into ct1 values(now+3a, 15, 15, 15, 15, 15, 'a')(now+4a, 14, 14, 14, 14, 14, 'a')(now+5a, 19, 19, 19, 19, 19, 'a')(now+6a, 17, 17, 17, 17, 17, 'a')
|
||||||
sql insert into ct1(ts, c1) values(now+10a, 8)(now+11a, 21)(now+12a, 28)(now+13a, 11)(now+14a, 9)
|
sql insert into ct1 values(now+7a, 16, 16, 16, 16, 16, 'a')(now+8a, 20, 20, 20, 20, 20, 'a')(now+9a, 22, 22, 22, 22, 22, 'a')
|
||||||
sql insert into ct1(ts, c1) values(now+15a, 29)(now+16a, 40)
|
sql insert into ct1 values(now+10a, 8, 8, 8, 8, 8, 'a')(now+11a, 21, 21, 21, 21, 21, 'a')(now+12a, 28, 28, 28, 28, 28, 'a')(now+13a, 11, 11, 11, 11, 11, 'a')(now+14a, 9, 9, 9, 9, 9, 'a')
|
||||||
|
sql insert into ct1 values(now+15a, 29, 29, 29, 29, 29, 'a')(now+16a, 40, 40, 40, 40, 40, 'a')
|
||||||
|
|
||||||
sql select count(*) from ct1
|
sql select count(*) from ct1
|
||||||
if $data00 != 17 then
|
if $data00 != 17 then
|
||||||
|
@ -68,6 +69,25 @@ if $data00 != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print ================= try every loaded anomaly detection algorithm
|
||||||
|
sql select count(*) from ct1 anomaly_window(c1, 'algo=iqr');
|
||||||
|
sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma');
|
||||||
|
sql select count(*) from ct1 anomaly_window(c1, 'algo=lof');
|
||||||
|
sql select count(*) from ct1 anomaly_window(c1, 'algo=shesd');
|
||||||
|
sql select count(*) from ct1 anomaly_window(c1, 'algo=grubbs');
|
||||||
|
|
||||||
|
print ================= try every column type of column
|
||||||
|
sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma,k=2');
|
||||||
|
sql select count(*) from ct1 anomaly_window(c2, 'algo=ksigma,k=2');
|
||||||
|
sql select count(*) from ct1 anomaly_window(c3, 'algo=ksigma,k=2');
|
||||||
|
sql select count(*) from ct1 anomaly_window(c4, 'algo=ksigma,k=2');
|
||||||
|
sql select count(*) from ct1 anomaly_window(c5, 'algo=ksigma,k=2');
|
||||||
|
|
||||||
|
print =================== invalid column type
|
||||||
|
sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2');
|
||||||
|
sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
|
||||||
|
|
||||||
|
|
||||||
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
|
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
|
||||||
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1
|
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1
|
||||||
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1
|
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1
|
||||||
|
@ -83,8 +103,15 @@ sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ,'
|
||||||
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1
|
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1
|
||||||
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1
|
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1
|
||||||
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1
|
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1
|
||||||
|
|
||||||
|
print =================== valid column type
|
||||||
sql select forecast(c1, 'conf=50 ,algo = arima') from ct1
|
sql select forecast(c1, 'conf=50 ,algo = arima') from ct1
|
||||||
sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1
|
sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1
|
||||||
|
sql select forecast(c2, 'conf=50 ,algo = arima, rows=1') from ct1
|
||||||
|
sql select forecast(c3, 'conf=50 ,algo = arima, rows=1') from ct1
|
||||||
|
sql select forecast(c4, 'conf=50 ,algo = arima, rows=1') from ct1
|
||||||
|
sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
|
||||||
|
sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
|
||||||
|
|
||||||
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1
|
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1
|
||||||
if $rows != 10 then
|
if $rows != 10 then
|
||||||
|
@ -131,6 +158,13 @@ if $rows != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
print ===================== query without anodes
|
||||||
|
sql_error select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
|
||||||
|
sql_error select count(*) from ct1 anomaly_window(c1, 'algo=iqr');
|
||||||
|
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
print =============== check
|
print =============== check
|
||||||
|
|
Loading…
Reference in New Issue