Merge pull request #29349 from taosdata/fix/3_liaohj

others: merge fix from main to 3.0, add test cases to improve the coverages.
This commit is contained in:
Shengliang Guan 2024-12-27 09:25:55 +08:00 committed by GitHub
commit b0f61b53ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 135 additions and 39 deletions

View File

@ -61,9 +61,13 @@ static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock*
int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) { SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo); QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
size_t keyBufSize = 0;
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
const char* id = GET_TASKID(pTaskInfo);
SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo)); SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode; SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode;
@ -74,13 +78,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
} }
if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) { if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) {
qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt); qError("%s failed to get anomaly_window algorithm name from %s", id, pAnomalyNode->anomalyOpt);
code = TSDB_CODE_ANA_ALGO_NOT_FOUND; code = TSDB_CODE_ANA_ALGO_NOT_FOUND;
goto _error; goto _error;
} }
if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) {
qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName); qError("%s failed to get anomaly_window algorithm url from %s", id, pInfo->algoName);
code = TSDB_CODE_ANA_ALGO_NOT_LOAD; code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
goto _error; goto _error;
} }
@ -94,20 +98,18 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
SExprInfo* pScalarExprInfo = NULL; SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
} }
size_t keyBufSize = 0;
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num); code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, id, pTaskInfo->streamInfo.pState,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc);
@ -124,27 +126,19 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
pInfo->anomalyCol = extractColumnFromColumnNode(pColNode); pInfo->anomalyCol = extractColumnFromColumnNode(pColNode);
pInfo->anomalyKey.type = pInfo->anomalyCol.type; pInfo->anomalyKey.type = pInfo->anomalyCol.type;
pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes; pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes;
pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes); pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes);
if (pInfo->anomalyKey.pData == NULL) { QUERY_CHECK_NULL(pInfo->anomalyKey.pData, code, lino, _error, terrno)
goto _error;
}
int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes; int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes;
pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize); pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize);
if (pInfo->anomalySup.pResultRow == NULL) { QUERY_CHECK_NULL(pInfo->anomalySup.pResultRow, code, lino, _error, terrno)
code = terrno;
goto _error;
}
pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*)); pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*));
if (pInfo->anomalySup.blocks == NULL) { QUERY_CHECK_NULL(pInfo->anomalySup.blocks, code, lino, _error, terrno)
code = terrno;
goto _error;
}
pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow)); pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow));
if (pInfo->anomalySup.windows == NULL) { QUERY_CHECK_NULL(pInfo->anomalySup.windows, code, lino, _error, terrno)
code = terrno;
goto _error;
}
code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
@ -162,18 +156,21 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
qDebug("anomaly_window operator is created, algo:%s url:%s opt:%s", pInfo->algoName, pInfo->algoUrl, qDebug("%s anomaly_window operator is created, algo:%s url:%s opt:%s", id, pInfo->algoName, pInfo->algoUrl,
pInfo->anomalyOpt); pInfo->anomalyOpt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
qError("%s failed to create anomaly_window operator, line:%d algo:%s code:%s", id, lino, pAnomalyNode->anomalyOpt,
tstrerror(code));
if (pInfo != NULL) { if (pInfo != NULL) {
anomalyDestroyOperatorInfo(pInfo); anomalyDestroyOperatorInfo(pInfo);
} }
destroyOperatorAndDownstreams(pOperator, &downstream, 1); destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to create anomaly_window operator, algo:%s code:0x%x", pInfo->algoName, code);
return code; return code;
} }

View File

@ -20,7 +20,7 @@
#ifdef USE_ANALYTICS #ifdef USE_ANALYTICS
#include <curl/curl.h> #include <curl/curl.h>
#define ANAL_ALGO_SPLIT "," #define ANALYTICS_ALOG_SPLIT_CHAR ","
typedef struct { typedef struct {
int64_t ver; int64_t ver;
@ -136,7 +136,7 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue,
return false; return false;
} }
pEnd = strstr(pStart, ANAL_ALGO_SPLIT); pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR);
if (optMaxLen > 0) { if (optMaxLen > 0) {
if (pEnd > pStart) { if (pEnd > pStart) {
int32_t len = (int32_t)(pEnd - pStart); int32_t len = (int32_t)(pEnd - pStart);
@ -168,7 +168,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu
int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName);
char *pos1 = strstr(option, buf); char *pos1 = strstr(option, buf);
char *pos2 = strstr(option, ANAL_ALGO_SPLIT); char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR);
if (pos1 != NULL) { if (pos1 != NULL) {
*optValue = taosStr2Int64(pos1 + bufLen, NULL, 10); *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10);
return true; return true;

View File

@ -13,7 +13,7 @@ if [ -n "$PID" ]; then
systemctl stop taosd systemctl stop taosd
fi fi
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taosd | grep -v grep | grep -v taosanode | awk '{print $2}'`
while [ -n "$PID" ]; do while [ -n "$PID" ]; do
echo kill -9 $PID echo kill -9 $PID
#pkill -9 taosd #pkill -9 taosd
@ -38,10 +38,10 @@ while [ -n "$PID" ]; do
else else
lsof -nti:6030 | xargs kill -9 lsof -nti:6030 | xargs kill -9
fi fi
PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taos | grep -v grep |grep -v taosanode| awk '{print $2}'`
done done
PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode|awk '{print $2}'`
while [ -n "$PID" ]; do while [ -n "$PID" ]; do
echo kill -9 $PID echo kill -9 $PID
#pkill -9 tmq_sim #pkill -9 tmq_sim
@ -52,5 +52,5 @@ while [ -n "$PID" ]; do
else else
lsof -nti:6030 | xargs kill -9 lsof -nti:6030 | xargs kill -9
fi fi
PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode| awk '{print $2}'`
done done

View File

@ -3,7 +3,17 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sql connect sql connect
print =============== create anode print =============== failed to create anode on '127.0.0.1:1101'
sql_error create anode '127.0.0.1:1101'
sql show anodes
if $rows != 0 then
return -1
endi
sql_error drop anode 1
print ================ create anode
sql create anode '192.168.1.116:6050' sql create anode '192.168.1.116:6050'
sql show anodes sql show anodes
@ -30,7 +40,7 @@ print $data00 $data01 $data02
sql use d0 sql use d0
print =============== create super table, include column type for count/sum/min/max/first print =============== create super table, include column type for count/sum/min/max/first
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned) sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 tinyint, c5 bigint, c6 varchar(12)) tags (t1 int unsigned)
sql show stables sql show stables
if $rows != 1 then if $rows != 1 then
@ -42,10 +52,11 @@ sql create table ct1 using stb tags(1000)
print ==================== insert data print ==================== insert data
# input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40] # input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40]
sql insert into ct1(ts, c1) values(now-1a, 5)(now+1a, 14)(now+2a, 15)(now+3a, 15)(now+4a, 14) sql insert into ct1(ts, c1, c2, c3, c4, c5, c6) values(now-1a, 5, 5, 5, 5, 5, 'a')(now+1a, 14, 14, 14, 14, 14, 'a')(now+2a, 15, 15, 15, 15, 15, 'a')
sql insert into ct1(ts, c1) values(now+5a, 19)(now+6a, 17)(now+7a, 16)(now+8a, 20)(now+9a, 22) sql insert into ct1 values(now+3a, 15, 15, 15, 15, 15, 'a')(now+4a, 14, 14, 14, 14, 14, 'a')(now+5a, 19, 19, 19, 19, 19, 'a')(now+6a, 17, 17, 17, 17, 17, 'a')
sql insert into ct1(ts, c1) values(now+10a, 8)(now+11a, 21)(now+12a, 28)(now+13a, 11)(now+14a, 9) sql insert into ct1 values(now+7a, 16, 16, 16, 16, 16, 'a')(now+8a, 20, 20, 20, 20, 20, 'a')(now+9a, 22, 22, 22, 22, 22, 'a')
sql insert into ct1(ts, c1) values(now+15a, 29)(now+16a, 40) sql insert into ct1 values(now+10a, 8, 8, 8, 8, 8, 'a')(now+11a, 21, 21, 21, 21, 21, 'a')(now+12a, 28, 28, 28, 28, 28, 'a')(now+13a, 11, 11, 11, 11, 11, 'a')(now+14a, 9, 9, 9, 9, 9, 'a')
sql insert into ct1 values(now+15a, 29, 29, 29, 29, 29, 'a')(now+16a, 40, 40, 40, 40, 40, 'a')
sql select count(*) from ct1 sql select count(*) from ct1
if $data00 != 17 then if $data00 != 17 then
@ -58,6 +69,87 @@ if $data00 != 1 then
return -1 return -1
endi endi
print ================= try every loaded anomaly detection algorithm
sql select count(*) from ct1 anomaly_window(c1, 'algo=iqr');
sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma');
sql select count(*) from ct1 anomaly_window(c1, 'algo=lof');
sql select count(*) from ct1 anomaly_window(c1, 'algo=shesd');
sql select count(*) from ct1 anomaly_window(c1, 'algo=grubbs');
print ================= try every column type of column
sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c2, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c3, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c4, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c5, 'algo=ksigma,k=2');
print =================== invalid column type
sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2');
sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1
sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=0') from ct1
sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=-10') from ct1
sql_error select forecast(c1, 'conf=50 ,algo = arima, every=0') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters, conf=50 ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, ' algo=holtwinters , conf=50 ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, ' algo = holtwinters , conf = 50 ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ,') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1
print =================== valid column type
sql select forecast(c1, 'conf=50 ,algo = arima') from ct1
sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c2, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c3, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c4, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1
if $rows != 10 then
return -1
endi
if $data03 != 28 then
return -1
endi
if $data00 != @23-11-15 06:13:20.000@ then
print expect 23-11-15 06:13:20.000 , actual $data00
return -1
endi
if $data10 != @23-11-15 06:13:20.002@ then
print expect 23-11-15 06:13:20.002 , actual $data10
return -1
endi
if $data20 != @23-11-15 06:13:20.004@ then
return -1
endi
print test the every option and rows option
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=100,rows=5') from ct1
if $rows != 5 then
return -1
endi
if $data00 != @23-11-15 06:13:20.000@ then
return -1
endi
if $data10 != @23-11-15 06:13:20.100@ then
return -1
endi
sql drop anode 1 sql drop anode 1
sql show anodes sql show anodes
@ -66,6 +158,13 @@ if $rows != 0 then
return -1 return -1
endi endi
sleep 1000
print ===================== query without anodes
sql_error select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
sql_error select count(*) from ct1 anomaly_window(c1, 'algo=iqr');
_OVER: _OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check print =============== check