From 05712e2ddeb10847900f687a186bb5c7f9046f8c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 26 Dec 2024 17:24:17 +0800 Subject: [PATCH 1/7] test(analytics): add tests for tdanalytics. --- source/util/src/tanalytics.c | 6 +-- tests/script/sh/stop_dnodes.sh | 8 +-- tests/script/tsim/analytics/basic0.sim | 67 +++++++++++++++++++++++++- 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c index e68edd4b76..bf2cb4fd07 100644 --- a/source/util/src/tanalytics.c +++ b/source/util/src/tanalytics.c @@ -20,7 +20,7 @@ #ifdef USE_ANALYTICS #include -#define ANAL_ALGO_SPLIT "," +#define ANALYTICS_ALOG_SPLIT_CHAR "," typedef struct { int64_t ver; @@ -136,7 +136,7 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, return false; } - pEnd = strstr(pStart, ANAL_ALGO_SPLIT); + pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR); if (optMaxLen > 0) { if (pEnd > pStart) { int32_t len = (int32_t)(pEnd - pStart); @@ -168,7 +168,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); char *pos1 = strstr(option, buf); - char *pos2 = strstr(option, ANAL_ALGO_SPLIT); + char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR); if (pos1 != NULL) { *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10); return true; diff --git a/tests/script/sh/stop_dnodes.sh b/tests/script/sh/stop_dnodes.sh index 8923804547..da2083b013 100755 --- a/tests/script/sh/stop_dnodes.sh +++ b/tests/script/sh/stop_dnodes.sh @@ -13,7 +13,7 @@ if [ -n "$PID" ]; then systemctl stop taosd fi -PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` +PID=`ps -ef|grep -w taosd | grep -v grep | grep -v taosanode | awk '{print $2}'` while [ -n "$PID" ]; do echo kill -9 $PID #pkill -9 taosd @@ -38,10 +38,10 @@ while [ -n "$PID" ]; do else lsof -nti:6030 | xargs kill -9 fi - PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'` + PID=`ps -ef|grep -w taos | grep -v grep |grep -v taosanode| awk '{print $2}'` done -PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` +PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode|awk '{print $2}'` while [ -n "$PID" ]; do echo kill -9 $PID #pkill -9 tmq_sim @@ -52,5 +52,5 @@ while [ -n "$PID" ]; do else lsof -nti:6030 | xargs kill -9 fi - PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` + PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode| awk '{print $2}'` done \ No newline at end of file diff --git a/tests/script/tsim/analytics/basic0.sim b/tests/script/tsim/analytics/basic0.sim index 77c9184e8f..35774e7682 100644 --- a/tests/script/tsim/analytics/basic0.sim +++ b/tests/script/tsim/analytics/basic0.sim @@ -3,7 +3,17 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/exec.sh -n dnode1 -s start sql connect -print =============== create anode +print =============== failed to create anode on '127.0.0.1:1101' +sql_error create anode '127.0.0.1:1101' + +sql show anodes +if $rows != 0 then + return -1 +endi + +sql_error drop anode 1 + +print ================ create anode sql create anode '192.168.1.116:6050' sql show anodes @@ -58,6 +68,61 @@ if $data00 != 1 then return -1 endi +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1 +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=0') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=-10') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, every=0') from ct1 + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters, conf=50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, ' algo=holtwinters , conf=50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, ' algo = holtwinters , conf = 50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ,') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1 +sql select forecast(c1, 'conf=50 ,algo = arima') from ct1 +sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1 + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1 +if $rows != 10 then + return -1 +endi + +if $data03 != 28 then + return -1 +endi + +if $data00 != @23-11-15 06:13:20.000@ then + print expect 23-11-15 06:13:20.000 , actual $data00 + return -1 +endi + +if $data10 != @23-11-15 06:13:20.002@ then + print expect 23-11-15 06:13:20.002 , actual $data10 + return -1 +endi + +if $data20 != @23-11-15 06:13:20.004@ then + return -1 +endi + +print test the every option and rows option + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=100,rows=5') from ct1 +if $rows != 5 then + return -1 +endi + +if $data00 != @23-11-15 06:13:20.000@ then + return -1 +endi + +if $data10 != @23-11-15 06:13:20.100@ then + return -1 +endi sql drop anode 1 sql show anodes From 8b0a371aa699d264a1e3cb014de4701c64a58c1e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 26 Dec 2024 18:17:02 +0800 Subject: [PATCH 2/7] test: add test case for anamaly detection in tdanalytics. --- .../libs/executor/src/anomalywindowoperator.c | 49 +++++++++---------- tests/script/tsim/analytics/basic0.sim | 44 +++++++++++++++-- 2 files changed, 62 insertions(+), 31 deletions(-) diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index dd1a52022e..eb72edb964 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -61,9 +61,13 @@ static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + size_t keyBufSize = 0; + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + const char* id = GET_TASKID(pTaskInfo); - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode; @@ -74,13 +78,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p } if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) { - qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt); + qError("%s failed to get anomaly_window algorithm name from %s", id, pAnomalyNode->anomalyOpt); code = TSDB_CODE_ANA_ALGO_NOT_FOUND; goto _error; } if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { - qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName); + qError("%s failed to get anomaly_window algorithm url from %s", id, pInfo->algoName); code = TSDB_CODE_ANA_ALGO_NOT_LOAD; goto _error; } @@ -94,20 +98,18 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p SExprInfo* pScalarExprInfo = NULL; code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } - size_t keyBufSize = 0; - int32_t num = 0; - SExprInfo* pExprInfo = NULL; code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num); QUERY_CHECK_CODE(code, lino, _error); initResultSizeInfo(&pOperator->resultInfo, 4096); - code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, id, pTaskInfo->streamInfo.pState, + &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc); @@ -124,27 +126,19 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p pInfo->anomalyCol = extractColumnFromColumnNode(pColNode); pInfo->anomalyKey.type = pInfo->anomalyCol.type; pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes; + pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes); - if (pInfo->anomalyKey.pData == NULL) { - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalyKey.pData, code, lino, _error, terrno) int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes; pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize); - if (pInfo->anomalySup.pResultRow == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalySup.pResultRow, code, lino, _error, terrno) + pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*)); - if (pInfo->anomalySup.blocks == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalySup.blocks, code, lino, _error, terrno) + pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow)); - if (pInfo->anomalySup.windows == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalySup.windows, code, lino, _error, terrno) code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); @@ -162,18 +156,21 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p *pOptrInfo = pOperator; - qDebug("anomaly_window operator is created, algo:%s url:%s opt:%s", pInfo->algoName, pInfo->algoUrl, + qDebug("%s anomaly_window operator is created, algo:%s url:%s opt:%s", id, pInfo->algoName, pInfo->algoUrl, pInfo->anomalyOpt); return TSDB_CODE_SUCCESS; _error: + qError("%s failed to create anomaly_window operator, line:%d algo:%s code:%s", id, lino, pAnomalyNode->anomalyOpt, + tstrerror(code)); + if (pInfo != NULL) { anomalyDestroyOperatorInfo(pInfo); } destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; - qError("failed to create anomaly_window operator, algo:%s code:0x%x", pInfo->algoName, code); + return code; } diff --git a/tests/script/tsim/analytics/basic0.sim b/tests/script/tsim/analytics/basic0.sim index 35774e7682..3ac49b1fc3 100644 --- a/tests/script/tsim/analytics/basic0.sim +++ b/tests/script/tsim/analytics/basic0.sim @@ -40,7 +40,7 @@ print $data00 $data01 $data02 sql use d0 print =============== create super table, include column type for count/sum/min/max/first -sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned) +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 tinyint, c5 bigint, c6 varchar(12)) tags (t1 int unsigned) sql show stables if $rows != 1 then @@ -52,10 +52,11 @@ sql create table ct1 using stb tags(1000) print ==================== insert data # input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40] -sql insert into ct1(ts, c1) values(now-1a, 5)(now+1a, 14)(now+2a, 15)(now+3a, 15)(now+4a, 14) -sql insert into ct1(ts, c1) values(now+5a, 19)(now+6a, 17)(now+7a, 16)(now+8a, 20)(now+9a, 22) -sql insert into ct1(ts, c1) values(now+10a, 8)(now+11a, 21)(now+12a, 28)(now+13a, 11)(now+14a, 9) -sql insert into ct1(ts, c1) values(now+15a, 29)(now+16a, 40) +sql insert into ct1(ts, c1, c2, c3, c4, c5, c6) values(now-1a, 5, 5, 5, 5, 5, 'a')(now+1a, 14, 14, 14, 14, 14, 'a')(now+2a, 15, 15, 15, 15, 15, 'a') +sql insert into ct1 values(now+3a, 15, 15, 15, 15, 15, 'a')(now+4a, 14, 14, 14, 14, 14, 'a')(now+5a, 19, 19, 19, 19, 19, 'a')(now+6a, 17, 17, 17, 17, 17, 'a') +sql insert into ct1 values(now+7a, 16, 16, 16, 16, 16, 'a')(now+8a, 20, 20, 20, 20, 20, 'a')(now+9a, 22, 22, 22, 22, 22, 'a') +sql insert into ct1 values(now+10a, 8, 8, 8, 8, 8, 'a')(now+11a, 21, 21, 21, 21, 21, 'a')(now+12a, 28, 28, 28, 28, 28, 'a')(now+13a, 11, 11, 11, 11, 11, 'a')(now+14a, 9, 9, 9, 9, 9, 'a') +sql insert into ct1 values(now+15a, 29, 29, 29, 29, 29, 'a')(now+16a, 40, 40, 40, 40, 40, 'a') sql select count(*) from ct1 if $data00 != 17 then @@ -68,6 +69,25 @@ if $data00 != 1 then return -1 endi +print ================= try every loaded anomaly detection algorithm +sql select count(*) from ct1 anomaly_window(c1, 'algo=iqr'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=lof'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=shesd'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=grubbs'); + +print ================= try every column type of column +sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c2, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c3, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c4, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c5, 'algo=ksigma,k=2'); + +print =================== invalid column type +sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2'); +sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 + + sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1 sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1 @@ -83,8 +103,15 @@ sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ,' sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1 sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1 sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1 + +print =================== valid column type sql select forecast(c1, 'conf=50 ,algo = arima') from ct1 sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c2, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c3, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c4, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1 if $rows != 10 then @@ -131,6 +158,13 @@ if $rows != 0 then return -1 endi +sleep 1000 + +print ===================== query without anodes +sql_error select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 +sql_error select count(*) from ct1 anomaly_window(c1, 'algo=iqr'); + + _OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT print =============== check From 68ece75ad050a0972934a94a4c6e63bf031ac66d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 27 Dec 2024 10:27:05 +0800 Subject: [PATCH 3/7] fix stream state operator issue --- .../executor/src/streamtimewindowoperator.c | 17 ++++++- tests/script/tsim/stream/basic5.sim | 49 ++++++++++++++----- 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 91af218a7e..031d2e8bdc 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -4466,6 +4466,19 @@ _end: return code; } +static bool isWinResult(SSessionKey* pKey, SSHashObj* pSeUpdate, SSHashObj* pResults) { + SSessionKey checkKey = {0}; + getSessionHashKey(pKey, &checkKey); + if (tSimpleHashGet(pSeUpdate, &checkKey, sizeof(SSessionKey)) != NULL) { + return true; + } + + if (tSimpleHashGet(pResults, &checkKey, sizeof(SSessionKey)) != NULL) { + return true; + } + return false; +} + static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated, SSHashObj* pStDeleted) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4518,7 +4531,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl code = setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin); QUERY_CHECK_CODE(code, lino, _end); - releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore); + if (isWinResult(&nextWin.winInfo.sessionWin, pSeUpdated, pAggSup->pResultRows) == false) { + releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore); + } setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); code = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, diff --git a/tests/script/tsim/stream/basic5.sim b/tests/script/tsim/stream/basic5.sim index d7dd603d3c..5d90740b13 100644 --- a/tests/script/tsim/stream/basic5.sim +++ b/tests/script/tsim/stream/basic5.sim @@ -2,13 +2,28 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 system sh/cfg.sh -n dnode1 -c debugflag -v 135 system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10 +system sh/cfg.sh -n dnode1 -c checkpointinterval -v 60 +system sh/cfg.sh -n dnode1 -c snodeAddress -v 127.0.0.1:873 system sh/exec.sh -n dnode1 -s start sleep 500 sql connect +print step1 ============= -print step1============= +print ================ create snode +sql show snodes +if $rows != 0 then + return -1 +endi + +sql create snode on dnode 1; +sql show snodes; +if $rows != 1 then + return -1 +endi + +print ============== snode created , create db sql create database test3 vgroups 1; sql use test3; @@ -57,7 +72,7 @@ loop8: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -87,7 +102,7 @@ loop9: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -127,7 +142,7 @@ loop10: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -162,7 +177,7 @@ loop11: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -194,7 +209,7 @@ loop11: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -239,7 +254,7 @@ loop12: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -315,7 +330,7 @@ loop13: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -369,7 +384,6 @@ if $data24 != 1 then endi print step4============= - sql create database test6 vgroups 4; sql use test6; sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int); @@ -396,7 +410,7 @@ loop14: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -449,4 +463,17 @@ if $data25 != 2 then goto loop14 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +print sleep for 1min for checkpoint generate +sleep 60000 + +print ================== restart to load checkpoint from snode + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start + +sleep 500 +sql connect + +sleep 30000 + +sql select start_ver, checkpoint_ver from information_schema.ins_stream_tasks; From 7a6eb1d9697975b520a31ee55f7da395f59ca33f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Dec 2024 11:19:24 +0800 Subject: [PATCH 4/7] test: update the sim, set the output as uint, instead of int to avoid overflow. --- source/libs/function/src/builtinsimpl.c | 2 +- tests/script/tsim/stream/basic5.sim | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1c77d12fb2..707018ac65 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6429,7 +6429,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return code; } - len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "Inmem_Rows=[%d] Stt_Rows=[%d] ", + len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "Inmem_Rows=[%u] Stt_Rows=[%u] ", pData->numOfInmemRows, pData->numOfSttRows); varDataSetLen(st, len); code = colDataSetVal(pColInfo, row++, st, false); diff --git a/tests/script/tsim/stream/basic5.sim b/tests/script/tsim/stream/basic5.sim index 5d90740b13..866fbd3ebe 100644 --- a/tests/script/tsim/stream/basic5.sim +++ b/tests/script/tsim/stream/basic5.sim @@ -476,4 +476,8 @@ sql connect sleep 30000 -sql select start_ver, checkpoint_ver from information_schema.ins_stream_tasks; +sql select start_ver, checkpoint_ver from information_schema.ins_stream_tasks where level='source'; +sleep 500 + +system sh/exec.sh -n dnode1 -s stop -x SIGINT + From 0bf9fb1e312a206d21f70993aef3c7c3400397ef Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 27 Dec 2024 15:11:19 +0800 Subject: [PATCH 5/7] add reset stream --- include/common/tmsg.h | 10 ++++++ include/common/tmsgdef.h | 1 + include/libs/nodes/cmdnodes.h | 6 ++++ source/common/src/msg/tmsg.c | 37 +++++++++++++++++++++ source/common/test/msgTypeTable.ini | 1 + source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 31 +++++++++++++++++ source/libs/nodes/src/nodesCodeFuncs.c | 28 ++++++++++++++++ source/libs/nodes/src/nodesUtilFuncs.c | 4 +++ source/libs/parser/inc/parAst.h | 1 + source/libs/parser/inc/sql.y | 1 + source/libs/parser/src/parAstCreater.c | 14 ++++++++ source/libs/parser/src/parTranslater.c | 13 ++++++++ 13 files changed, 148 insertions(+) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0b6a8b3f1b..6f9ff1ec97 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -480,6 +480,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY, QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC, + QUERY_NODE_RESET_STREAM_STMT, } ENodeType; typedef struct { @@ -3913,6 +3914,15 @@ typedef struct { int32_t tSerializeSMResumeStreamReq(void* buf, int32_t bufLen, const SMResumeStreamReq* pReq); int32_t tDeserializeSMResumeStreamReq(void* buf, int32_t bufLen, SMResumeStreamReq* pReq); +typedef struct { + char name[TSDB_STREAM_FNAME_LEN]; + int8_t igNotExists; + int8_t igUntreated; +} SMResetStreamReq; + +int32_t tSerializeSMResetStreamReq(void* buf, int32_t bufLen, const SMResetStreamReq* pReq); +int32_t tDeserializeSMResetStreamReq(void* buf, int32_t bufLen, SMResetStreamReq* pReq); + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 93bfe306b6..e325d42ecf 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -261,6 +261,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_RESET_STREAM, "reset-stream", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 8eb30b8184..d8584711a5 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -595,6 +595,12 @@ typedef struct SResumeStreamStmt { bool ignoreUntreated; } SResumeStreamStmt; +typedef struct SResetStreamStmt { + ENodeType type; + char streamName[TSDB_TABLE_NAME_LEN]; + bool ignoreNotExists; +} SResetStreamStmt; + typedef struct SCreateFunctionStmt { ENodeType type; bool orReplace; diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 2193c7983f..aae0c013f7 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -12018,6 +12018,43 @@ _exit: return code; } +int32_t tSerializeSMResetStreamReq(void *buf, int32_t bufLen, const SMResetStreamReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists)); + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMResetStreamReq(void *buf, int32_t bufLen, SMResetStreamReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + + tDecoderInit(&decoder, buf, bufLen); + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists)); + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pTopicEp->topic); diff --git a/source/common/test/msgTypeTable.ini b/source/common/test/msgTypeTable.ini index 0325183d8b..e31f21fd27 100644 --- a/source/common/test/msgTypeTable.ini +++ b/source/common/test/msgTypeTable.ini @@ -438,6 +438,7 @@ TDMT_STREAM_DROP = 1053 TDMT_STREAM_DROP_RSP = 1054 TDMT_STREAM_RETRIEVE_TRIGGER = 1055 TDMT_STREAM_RETRIEVE_TRIGGER_RSP = 1056 +TDMT_MND_RESET_STREAM = 1057 TDMT_SYNC_TIMEOUT = 1537 TDMT_SYNC_TIMEOUT_RSP = 1538 TDMT_SYNC_TIMEOUT_ELECTION = 1539 diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8e595f76c9..869d68ed94 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -182,6 +182,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_RESET_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_ANAL_ALGO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3bee82e3e7..963f69de64 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -52,6 +52,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter); static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq); static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); +static int32_t mndProcessResetStreamReq(SRpcMsg *pReq); static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); @@ -130,6 +131,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq); + mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); @@ -1898,6 +1900,35 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } +static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SStreamObj *pStream = NULL; + int32_t code = 0; + + if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { + return code; + } + + SMResetStreamReq resetReq = {0}; + if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) { + TAOS_RETURN(TSDB_CODE_INVALID_MSG); + } + + code = mndAcquireStream(pMnode, resetReq.name, &pStream); + if (pStream == NULL || code != 0) { + if (resetReq.igNotExists) { + mInfo("stream:%s, not exist, not pause stream", resetReq.name); + return 0; + } else { + mError("stream:%s not exist, failed to pause stream", resetReq.name); + TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); + } + } + + //todo(liao hao jun) + return TSDB_CODE_ACTION_IN_PROGRESS; +} + static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 6d4d89607f..369e6de85b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -193,6 +193,8 @@ const char* nodesNodeName(ENodeType type) { return "PauseStreamStmt"; case QUERY_NODE_RESUME_STREAM_STMT: return "ResumeStreamStmt"; + case QUERY_NODE_RESET_STREAM_STMT: + return "ResetStreamStmt"; case QUERY_NODE_BALANCE_VGROUP_STMT: return "BalanceVgroupStmt"; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: @@ -7287,6 +7289,32 @@ static int32_t jsonToDropStreamStmt(const SJson* pJson, void* pObj) { return code; } +static const char* jkResetStreamStmtStreamName = "StreamName"; +static const char* jkResetStreamStmtIgnoreNotExists = "IgnoreNotExists"; + +static int32_t resetStreamStmtToJson(const void* pObj, SJson* pJson) { + const SResetStreamStmt* pNode = (const SResetStreamStmt*)pObj; + + int32_t code = tjsonAddStringToObject(pJson, jkResetStreamStmtStreamName, pNode->streamName); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkResetStreamStmtIgnoreNotExists, pNode->ignoreNotExists); + } + + return code; +} + +static int32_t jsonToResetStreamStmt(const SJson* pJson, void* pObj) { + SResetStreamStmt* pNode = (SResetStreamStmt*)pObj; + + int32_t code = tjsonGetStringValue(pJson, jkResetStreamStmtStreamName, pNode->streamName); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkResetStreamStmtIgnoreNotExists, &pNode->ignoreNotExists); + } + + return code; +} + + static const char* jkMergeVgroupStmtVgroupId1 = "VgroupId1"; static const char* jkMergeVgroupStmtVgroupId2 = "VgroupId2"; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 7beaeaa46c..5c22ff00a3 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -615,6 +615,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_RESUME_STREAM_STMT: code = makeNode(type, sizeof(SResumeStreamStmt), &pNode); break; + case QUERY_NODE_RESET_STREAM_STMT: + code = makeNode(type, sizeof(SResetStreamStmt), &pNode); + break; case QUERY_NODE_BALANCE_VGROUP_STMT: code = makeNode(type, sizeof(SBalanceVgroupStmt), &pNode); break; @@ -1480,6 +1483,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_DROP_STREAM_STMT: // no pointer field case QUERY_NODE_PAUSE_STREAM_STMT: // no pointer field case QUERY_NODE_RESUME_STREAM_STMT: // no pointer field + case QUERY_NODE_RESET_STREAM_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: // no pointer field diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index e69a3da4a9..c6d617fc3d 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -301,6 +301,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName); SNode* createPauseStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName); SNode* createResumeStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, bool ignoreUntreated, SToken* pStreamName); +SNode* createResetStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName); SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId); SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId); SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 63eb09d509..0354d3c590 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -783,6 +783,7 @@ cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); } cmd ::= PAUSE STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createPauseStreamStmt(pCxt, A, &B); } cmd ::= RESUME STREAM exists_opt(A) ignore_opt(C) stream_name(B). { pCxt->pRootNode = createResumeStreamStmt(pCxt, A, C, &B); } +cmd ::= RESET STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createResetStreamStmt(pCxt, A, &B); } %type col_list_opt { SNodeList* } %destructor col_list_opt { nodesDestroyList($$); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index a13472620b..b78d6baede 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -3714,6 +3714,20 @@ _err: return NULL; } +SNode* createResetStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName) { + CHECK_PARSER_STATUS(pCxt); + CHECK_NAME(checkStreamName(pCxt, pStreamName)); + SPauseStreamStmt* pStmt = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_RESET_STREAM_STMT, (SNode**)&pStmt); + CHECK_MAKE_NODE(pStmt); + COPY_STRING_FORM_ID_TOKEN(pStmt->streamName, pStreamName); + pStmt->ignoreNotExists = ignoreNotExists; + return (SNode*)pStmt; +_err: + return NULL; +} + + SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) { CHECK_PARSER_STATUS(pCxt); SKillStmt* pStmt = NULL; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f191080512..951fabb0f6 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -12346,6 +12346,16 @@ static int32_t translateResumeStream(STranslateContext* pCxt, SResumeStreamStmt* return buildCmdMsg(pCxt, TDMT_MND_RESUME_STREAM, (FSerializeFunc)tSerializeSMResumeStreamReq, &req); } +static int32_t translateResetStream(STranslateContext* pCxt, SResetStreamStmt* pStmt) { + SMResetStreamReq req = {0}; + SName name; + int32_t code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName)); + if (TSDB_CODE_SUCCESS != code) return code; + (void)tNameGetFullDbName(&name, req.name); + req.igNotExists = pStmt->ignoreNotExists; + return buildCmdMsg(pCxt, TDMT_MND_RESET_STREAM, (FSerializeFunc)tSerializeSMResetStreamReq, &req); +} + static int32_t validateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStmt) { if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) && QUERY_NODE_SET_OPERATOR != nodeType(pStmt->pQuery)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type"); @@ -13380,6 +13390,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_RESUME_STREAM_STMT: code = translateResumeStream(pCxt, (SResumeStreamStmt*)pNode); break; + case QUERY_NODE_RESET_STREAM_STMT: + code = translateResetStream(pCxt, (SResetStreamStmt*)pNode); + break; case QUERY_NODE_CREATE_FUNCTION_STMT: code = translateCreateFunction(pCxt, (SCreateFunctionStmt*)pNode); break; From f03e2305e03cced5ad5526bae328fe362caddcde Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Dec 2024 14:26:20 +0800 Subject: [PATCH 6/7] fix(stream): add logs and set the reset-stream rsp. --- source/common/test/msgTypeTable.ini | 1 + source/dnode/mnode/impl/src/mndStream.c | 2 ++ 2 files changed, 3 insertions(+) diff --git a/source/common/test/msgTypeTable.ini b/source/common/test/msgTypeTable.ini index e31f21fd27..add1128850 100644 --- a/source/common/test/msgTypeTable.ini +++ b/source/common/test/msgTypeTable.ini @@ -439,6 +439,7 @@ TDMT_STREAM_DROP_RSP = 1054 TDMT_STREAM_RETRIEVE_TRIGGER = 1055 TDMT_STREAM_RETRIEVE_TRIGGER_RSP = 1056 TDMT_MND_RESET_STREAM = 1057 +TDMT_MND_RESET_STREAM_RSP = 1058 TDMT_SYNC_TIMEOUT = 1537 TDMT_SYNC_TIMEOUT_RSP = 1538 TDMT_SYNC_TIMEOUT_ELECTION = 1539 diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 963f69de64..577efd155a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1914,6 +1914,8 @@ static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) { TAOS_RETURN(TSDB_CODE_INVALID_MSG); } + mDebug("recv reset stream req, stream:%s", resetReq.name); + code = mndAcquireStream(pMnode, resetReq.name, &pStream); if (pStream == NULL || code != 0) { if (resetReq.igNotExists) { From d8963b467a3cf4aa5e39de668ab8a8c2c96ef90a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Dec 2024 14:39:52 +0800 Subject: [PATCH 7/7] fix(stream): remove msg info. --- source/common/test/msgTypeTable.ini | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/common/test/msgTypeTable.ini b/source/common/test/msgTypeTable.ini index add1128850..0325183d8b 100644 --- a/source/common/test/msgTypeTable.ini +++ b/source/common/test/msgTypeTable.ini @@ -438,8 +438,6 @@ TDMT_STREAM_DROP = 1053 TDMT_STREAM_DROP_RSP = 1054 TDMT_STREAM_RETRIEVE_TRIGGER = 1055 TDMT_STREAM_RETRIEVE_TRIGGER_RSP = 1056 -TDMT_MND_RESET_STREAM = 1057 -TDMT_MND_RESET_STREAM_RSP = 1058 TDMT_SYNC_TIMEOUT = 1537 TDMT_SYNC_TIMEOUT_RSP = 1538 TDMT_SYNC_TIMEOUT_ELECTION = 1539