From d6bb19485ad8fbbd080d7271476c78bb82f14520 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 13 Jul 2023 16:53:07 +0800 Subject: [PATCH 01/11] mem leak --- source/libs/executor/src/scanoperator.c | 5 +- tests/script/tsim/stream/checkpoint0.sim | 102 +++++++++++++++++++++-- 2 files changed, 99 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d7373b2aac..927c85b262 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1788,10 +1788,13 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); int32_t tlen = len - (pBuff - buf); - void* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0, pInfo->igCheckUpdate); + void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo); if (code == TSDB_CODE_SUCCESS) { + pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); pInfo->pUpdateInfo = pUpInfo; + } else { + taosMemoryFree(pUpInfo); } } diff --git a/tests/script/tsim/stream/checkpoint0.sim b/tests/script/tsim/stream/checkpoint0.sim index 325667ee2b..ef006fead4 100644 --- a/tests/script/tsim/stream/checkpoint0.sim +++ b/tests/script/tsim/stream/checkpoint0.sim @@ -6,18 +6,12 @@ sql connect print =============== create database sql create database test vgroups 1; -sql select * from information_schema.ins_databases -if $rows != 3 then - return -1 -endi - -print $data00 $data01 $data02 sql use test; sql create table t1(ts timestamp, a int, b int , c int, d double); -sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s); +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s); sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791213001,2,2,3,1.1); @@ -130,4 +124,98 @@ if $data12 != 4 then goto loop2 endi + +sql create database test1 vgroups 4; + +sql use test1; + + +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, sum(a) from st interval(10s); + +sql insert into t1 values(1648791213000,1,2,3,1.0); + +sql insert into t2 values(1648791213001,2,2,3,1.1); + +$loop_count = 0 + +loop2: +sleep 1000 + +sql select * from streamt1; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop2 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 3 then + print =====data02=$data02 + goto loop2 +endi + +print waiting for checkpoint generation ...... + +sleep 25000 + +print restart taosd + +system sh/exec.sh -n dnode1 -s stop -x SIGINT + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791213002,3,2,3,1.1); + +$loop_count = 0 + +loop3: +sleep 1000 + +sql select * from streamt1; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop3 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop3 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop3 +endi + +# row 1 +if $data11 != 1 then + print =====data01=$data01 + goto loop3 +endi + +if $data12 != 4 then + print =====data02=$data02 + goto loop3 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file From b9d274d893b863e2296d1a8d60997bf40091f378 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 13 Jul 2023 17:16:48 +0800 Subject: [PATCH 02/11] op ci --- tests/script/tsim/stream/checkpoint0.sim | 74 +++++++++++++++++++----- 1 file changed, 61 insertions(+), 13 deletions(-) diff --git a/tests/script/tsim/stream/checkpoint0.sim b/tests/script/tsim/stream/checkpoint0.sim index ef006fead4..5e710be416 100644 --- a/tests/script/tsim/stream/checkpoint0.sim +++ b/tests/script/tsim/stream/checkpoint0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +print step 1 + print =============== create database sql create database test vgroups 1; @@ -23,7 +25,7 @@ sleep 1000 sql select * from streamt; $loop_count = $loop_count + 1 -if $loop_count == 20 then +if $loop_count == 10 then return -1 endi @@ -43,11 +45,11 @@ if $data02 != 3 then goto loop0 endi -print waiting for checkpoint generation ...... +print waiting for checkpoint generation 1 ...... sleep 25000 -print restart taosd +print restart taosd 01 ...... system sh/exec.sh -n dnode1 -s stop -x SIGINT @@ -63,7 +65,7 @@ sleep 1000 sql select * from streamt; $loop_count = $loop_count + 1 -if $loop_count == 20 then +if $loop_count == 10 then return -1 endi @@ -93,7 +95,7 @@ sleep 1000 sql select * from streamt; $loop_count = $loop_count + 1 -if $loop_count == 20 then +if $loop_count == 10 then return -1 endi @@ -115,21 +117,67 @@ endi # row 1 if $data11 != 1 then - print =====data01=$data01 + print =====data11=$data11 goto loop2 endi if $data12 != 4 then - print =====data02=$data02 + print =====data12=$data12 goto loop2 endi +print restart taosd 02 ...... + +system sh/exec.sh -n dnode1 -s stop -x SIGINT + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791223003,5,2,3,1.1); + +loop20: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop20 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop20 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop20 +endi + +# row 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop20 +endi + +if $data12 != 9 then + print =====data12=$data12 + goto loop20 +endi + +print step 2 + sql create database test1 vgroups 4; sql use test1; - sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); @@ -147,7 +195,7 @@ sleep 1000 sql select * from streamt1; $loop_count = $loop_count + 1 -if $loop_count == 20 then +if $loop_count == 10 then return -1 endi @@ -167,7 +215,7 @@ if $data02 != 3 then goto loop2 endi -print waiting for checkpoint generation ...... +print waiting for checkpoint generation 2 ...... sleep 25000 @@ -187,7 +235,7 @@ sleep 1000 sql select * from streamt1; $loop_count = $loop_count + 1 -if $loop_count == 20 then +if $loop_count == 10 then return -1 endi @@ -209,12 +257,12 @@ endi # row 1 if $data11 != 1 then - print =====data01=$data01 + print =====data11=$data11 goto loop3 endi if $data12 != 4 then - print =====data02=$data02 + print =====data12=$data12 goto loop3 endi From d8b8112dbed6cac561c024f357e7815236e50479 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 13 Jul 2023 17:29:30 +0800 Subject: [PATCH 03/11] op ci --- tests/script/tsim/stream/checkpoint0.sim | 99 +--------------------- tests/script/tsim/stream/checkpoint1.sim | 103 +++++++++++++++++++++++ 2 files changed, 107 insertions(+), 95 deletions(-) create mode 100644 tests/script/tsim/stream/checkpoint1.sim diff --git a/tests/script/tsim/stream/checkpoint0.sim b/tests/script/tsim/stream/checkpoint0.sim index 5e710be416..9b9198fda2 100644 --- a/tests/script/tsim/stream/checkpoint0.sim +++ b/tests/script/tsim/stream/checkpoint0.sim @@ -51,7 +51,7 @@ sleep 25000 print restart taosd 01 ...... -system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/stop_dnodes.sh system sh/exec.sh -n dnode1 -s start @@ -126,10 +126,11 @@ if $data12 != 4 then goto loop2 endi +print step 2 print restart taosd 02 ...... -system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/stop_dnodes.sh system sh/exec.sh -n dnode1 -s start @@ -172,98 +173,6 @@ if $data12 != 9 then goto loop20 endi -print step 2 - -sql create database test1 vgroups 4; - -sql use test1; - -sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); -sql create table t1 using st tags(1,1,1); -sql create table t2 using st tags(2,2,2); -sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, sum(a) from st interval(10s); - -sql insert into t1 values(1648791213000,1,2,3,1.0); - -sql insert into t2 values(1648791213001,2,2,3,1.1); - -$loop_count = 0 - -loop2: -sleep 1000 - -sql select * from streamt1; - -$loop_count = $loop_count + 1 -if $loop_count == 10 then - return -1 -endi - -if $rows != 1 then - print =====rows=$rows expect 1 - goto loop2 -endi - -# row 0 -if $data01 != 2 then - print =====data01=$data01 - goto loop2 -endi - -if $data02 != 3 then - print =====data02=$data02 - goto loop2 -endi - -print waiting for checkpoint generation 2 ...... - -sleep 25000 - -print restart taosd - -system sh/exec.sh -n dnode1 -s stop -x SIGINT - -system sh/exec.sh -n dnode1 -s start - -sql insert into t1 values(1648791213002,3,2,3,1.1); - -$loop_count = 0 - -loop3: -sleep 1000 - -sql select * from streamt1; - -$loop_count = $loop_count + 1 -if $loop_count == 10 then - return -1 -endi - -if $rows != 2 then - print =====rows=$rows expect 2 - goto loop3 -endi - -# row 0 -if $data01 != 3 then - print =====data01=$data01 - goto loop3 -endi - -if $data02 != 6 then - print =====data02=$data02 - goto loop3 -endi - -# row 1 -if $data11 != 1 then - print =====data11=$data11 - goto loop3 -endi - -if $data12 != 4 then - print =====data12=$data12 - goto loop3 -endi +print end--------------------------------- system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/stream/checkpoint1.sim b/tests/script/tsim/stream/checkpoint1.sim new file mode 100644 index 0000000000..a339a1ad35 --- /dev/null +++ b/tests/script/tsim/stream/checkpoint1.sim @@ -0,0 +1,103 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step 1 + +sql create database test vgroups 4; + +sql use test; + +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st interval(10s); + +sql insert into t1 values(1648791213000,1,2,3,1.0); + +sql insert into t2 values(1648791213001,2,2,3,1.1); + +$loop_count = 0 + +loop0: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop0 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 3 then + print =====data02=$data02 + goto loop0 +endi + +print waiting for checkpoint generation 1 ...... + +sleep 25000 + +print restart taosd + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791213002,3,2,3,1.1); + +$loop_count = 0 + +loop1: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop1 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop1 +endi + +# row 1 +if $data11 != 1 then + print =====data11=$data11 + goto loop1 +endi + +if $data12 != 4 then + print =====data12=$data12 + goto loop1 +endi + +print end--------------------------------- + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file From c3375dd744a542389afaec92e3a55c4f942ae3d5 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 13 Jul 2023 19:24:11 +0800 Subject: [PATCH 04/11] mem leak --- source/libs/stream/src/tstreamFileState.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 6d61f65394..bca9dcabda 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -507,10 +507,12 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); taosMemoryFreeClear(pNode); + taosMemoryFreeClear(pVal); break; } ASSERT(pVLen == pFileState->rowSize); memcpy(pNewPos->pRowBuff, pVal, pVLen); + taosMemoryFreeClear(pVal); code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { destroyRowBuffPos(pNewPos); From e588640e02175580e118e6820c34d6a08ebe42f9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 13 Jul 2023 20:46:58 +0800 Subject: [PATCH 05/11] fix recover error --- source/dnode/mnode/impl/src/mndStream.c | 105 ++++++++++++++++++++++-- 1 file changed, 99 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8ce9ff6def..568ca530a5 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1009,7 +1009,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre /*A(pTask->info.nodeId > 0);*/ SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); if (pVgObj == NULL) { - taosRUnLockLatch(&pStream->lock); + taosWUnLockLatch(&pStream->lock); mndTransDrop(pTrans); return -1; } @@ -1019,7 +1019,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, pTask->id.taskId) < 0) { mndReleaseVgroup(pMnode, pVgObj); - taosRUnLockLatch(&pStream->lock); + taosWUnLockLatch(&pStream->lock); mndTransDrop(pTrans); return -1; } @@ -1034,7 +1034,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); - taosRUnLockLatch(&pStream->lock); + taosWUnLockLatch(&pStream->lock); mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); return -1; @@ -1079,6 +1079,78 @@ _ERR: mndTransDrop(pTrans); return -1; } + +static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, + int64_t checkpointId) { + taosWLockLatch(&pStream->lock); + + int32_t totLevel = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < totLevel; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + SStreamTask *pTask = taosArrayGetP(pLevel, 0); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t sz = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + /*A(pTask->info.nodeId > 0);*/ + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + if (pVgObj == NULL) { + taosWUnLockLatch(&pStream->lock); + return -1; + } + + void *buf; + int32_t tlen; + if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, + pTask->id.taskId) < 0) { + mndReleaseVgroup(pMnode, pVgObj); + taosWUnLockLatch(&pStream->lock); + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; + + mndReleaseVgroup(pMnode, pVgObj); + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(buf); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } + } + } + + pStream->checkpointFreq = checkpointId; + pStream->checkpointId = checkpointId; + pStream->checkpointFreq = taosGetTimestampMs(); + atomic_store_64(&pStream->currentTick, 0); + // 3. commit log: stream checkpoint info + pStream->version = pStream->version + 1; + + taosWUnLockLatch(&pStream->lock); + + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL) { + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + return 0; +} static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1089,16 +1161,37 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); + if (pTrans == NULL) { + mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; + } + mndTransSetDbName(pTrans, "checkpoint", "checkpoint"); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, + tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); + mndTransDrop(pTrans); + return -1; + } + while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId); + + code = mndAddStreamCheckpointToTrans(pTrans, pStream, pMnode, checkpointId); if (code == -1) { - mInfo("stream:%s failed to do checkpoint, reason: last checkpoint not finished", pStream->name); + sdbRelease(pSdb, pStream); + break; } sdbRelease(pSdb, pStream); } - return 0; + if (code == 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("failed to prepre trans rebalance since %s", terrstr()); + } + } + mndTransDrop(pTrans); + return code; } static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { From 44170b9b1668226986592ebcccd6c61f6f15b6dc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jul 2023 10:36:27 +0800 Subject: [PATCH 06/11] fix recover error --- source/dnode/mnode/impl/src/mndStream.c | 2 ++ source/dnode/mnode/impl/src/mndTrans.c | 33 ++++++++++++------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 568ca530a5..2da3a1365a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1166,6 +1166,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return -1; } + mDebug("start to trigger checkpoint"); + mndTransSetDbName(pTrans, "checkpoint", "checkpoint"); if (mndTransCheckConflict(pMnode, pTrans) != 0) { mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7ebaf6dda5..f622d49a4f 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -28,9 +28,9 @@ #define TRANS_ARRAY_SIZE 8 #define TRANS_RESERVE_SIZE 48 -static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); -static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); -static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc); +static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); +static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); +static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); @@ -100,10 +100,9 @@ static int32_t mndTransGetActionsSize(SArray *pArray) { return rawDataLen; } - static int32_t mndTransEncodeAction(SSdbRaw *pRaw, int32_t *offset, SArray *pActions, int32_t actionsNum) { int32_t dataPos = *offset; - int8_t unused = 0; + int8_t unused = 0; int32_t ret = -1; for (int32_t i = 0; i < actionsNum; ++i) { @@ -266,16 +265,16 @@ _OVER: SSdbRow *mndTransDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_INVALID_MSG; - SSdbRow *pRow = NULL; - STrans *pTrans = NULL; - char *pData = NULL; - int32_t dataLen = 0; - int8_t sver = 0; - int32_t prepareActionNum = 0; - int32_t redoActionNum = 0; - int32_t undoActionNum = 0; - int32_t commitActionNum = 0; - int32_t dataPos = 0; + SSdbRow *pRow = NULL; + STrans *pTrans = NULL; + char *pData = NULL; + int32_t dataLen = 0; + int8_t sver = 0; + int32_t prepareActionNum = 0; + int32_t redoActionNum = 0; + int32_t undoActionNum = 0; + int32_t commitActionNum = 0; + int32_t dataPos = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -577,7 +576,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); - pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0; + pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64(); taosInitRWLatch(&pTrans->lockRpcArray); taosThreadMutexInit(&pTrans->mutex, NULL); @@ -1327,7 +1326,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) } bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { - bool continueExec = true; + bool continueExec = true; int32_t code = 0; int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions); From dd4740e5a4a2644bab8bd6d6d3dbba9fbc753b34 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jul 2023 03:45:30 +0000 Subject: [PATCH 07/11] add checkpoint --- source/dnode/mnode/impl/src/mndStream.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 2da3a1365a..adf7d85aeb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1166,7 +1166,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return -1; } - mDebug("start to trigger checkpoint"); + mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); mndTransSetDbName(pTrans, "checkpoint", "checkpoint"); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -1181,11 +1181,10 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { if (pIter == NULL) break; code = mndAddStreamCheckpointToTrans(pTrans, pStream, pMnode, checkpointId); + sdbRelease(pSdb, pStream); if (code == -1) { - sdbRelease(pSdb, pStream); break; } - sdbRelease(pSdb, pStream); } if (code == 0) { if (mndTransPrepare(pMnode, pTrans) != 0) { From 15f5fd19eb06651bda905a4fff1de53fa938eb4a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jul 2023 14:47:06 +0800 Subject: [PATCH 08/11] fix recover error --- source/dnode/vnode/src/meta/metaTable.c | 4 ++-- source/libs/stream/src/streamBackendRocksdb.c | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index cb4b3231f6..1907b569d5 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -452,7 +452,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { } } - if (diffIdx == -1 && diffIdx == 0) { + if (diffIdx == -1 || diffIdx == 0) { goto _err; } @@ -939,7 +939,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) { return 0; } - metaInfo("ttl find expired table count: %zu" , TARRAY_SIZE(tbUids)); + metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids)); metaDropTables(pMeta, tbUids); return 0; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 0b7d6d7693..ad3be425c3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -986,8 +986,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t char suffix[64] = {0}; rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); - RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*)); - rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t**)); + RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam)); + rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*)); rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); for (int i = 0; i < nCf; i++) { @@ -1153,7 +1153,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { param[i].tableOpt = tableOpt; }; - rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); + rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); for (int i = 0; i < cfLen; i++) { SCfInit* cf = &ginitDict[i]; From d0ccbd48d2a2bba3c718f59a8686fea2375ba243 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jul 2023 14:54:20 +0800 Subject: [PATCH 09/11] fix coverity scan problem --- source/libs/stream/src/streamBackendRocksdb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ad3be425c3..3bce60421c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -262,8 +262,8 @@ void streamBackendCleanup(void* arg) { taosThreadMutexDestroy(&pHandle->cfMutex); - taosMemoryFree(pHandle); qDebug("destroy stream backend backend:%p", pHandle); + taosMemoryFree(pHandle); return; } void streamBackendHandleCleanup(void* arg) { @@ -1294,8 +1294,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChk int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ if (err != NULL) { \ - taosMemoryFree(err); \ qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + taosMemoryFree(err); \ code = -1; \ } else { \ qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \ From be050e12ea596508be5b08f178b8628bc7ec7d30 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jul 2023 15:46:00 +0800 Subject: [PATCH 10/11] fix recover error --- source/libs/stream/src/streamBackendRocksdb.c | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 3bce60421c..cfccf1d286 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -40,16 +40,8 @@ typedef struct { rocksdb_comparator_t** pCompares; } RocksdbCfInst; -uint32_t nextPow2(uint32_t x) { - if (x <= 1) return 2; - x = x - 1; - x = x | (x >> 1); - x = x | (x >> 2); - x = x | (x >> 4); - x = x | (x >> 8); - x = x | (x >> 16); - return x + 1; -} +uint32_t nextPow2(uint32_t x); + int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); void destroyRocksdbCfInst(RocksdbCfInst* inst); @@ -2361,3 +2353,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { } return 0; } +uint32_t nextPow2(uint32_t x) { + if (x <= 1) return 2; + x = x - 1; + x = x | (x >> 1); + x = x | (x >> 2); + x = x | (x >> 4); + x = x | (x >> 8); + x = x | (x >> 16); + return x + 1; +} From 44be57b3cb1de0d3e033b6d2f99081f609dfb969 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 14 Jul 2023 16:29:37 +0800 Subject: [PATCH 11/11] add ci --- tests/parallel_test/cases.task | 1 + ...heckpoint0.sim => checkpointInterval0.sim} | 4 +- ...heckpoint1.sim => checkpointInterval1.sim} | 1 + .../script/tsim/stream/checkpointSession0.sim | 178 ++++++++++++++++++ .../script/tsim/stream/checkpointSession1.sim | 104 ++++++++++ tests/script/win-test-file | 1 + 6 files changed, 287 insertions(+), 2 deletions(-) rename tests/script/tsim/stream/{checkpoint0.sim => checkpointInterval0.sim} (96%) rename tests/script/tsim/stream/{checkpoint1.sim => checkpointInterval1.sim} (97%) create mode 100644 tests/script/tsim/stream/checkpointSession0.sim create mode 100644 tests/script/tsim/stream/checkpointSession1.sim diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 21fed2e1f5..d3bd114cd8 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -997,6 +997,7 @@ ,,y,script,./test.sh -f tsim/stream/basic2.sim ,,y,script,./test.sh -f tsim/stream/basic3.sim ,,y,script,./test.sh -f tsim/stream/basic4.sim +,,y,script,./test.sh -f tsim/stream/checkpointInterval0.sim ,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim ,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim ,,y,script,./test.sh -f tsim/stream/deleteInterval.sim diff --git a/tests/script/tsim/stream/checkpoint0.sim b/tests/script/tsim/stream/checkpointInterval0.sim similarity index 96% rename from tests/script/tsim/stream/checkpoint0.sim rename to tests/script/tsim/stream/checkpointInterval0.sim index 9b9198fda2..5bc8222a54 100644 --- a/tests/script/tsim/stream/checkpoint0.sim +++ b/tests/script/tsim/stream/checkpointInterval0.sim @@ -85,7 +85,7 @@ if $data02 != 6 then goto loop1 endi -sql insert into t1 values(1648791223002,4,2,3,1.1); +sql insert into t1 values(1648791223003,4,2,3,1.1); $loop_count = 0 @@ -134,7 +134,7 @@ system sh/stop_dnodes.sh system sh/exec.sh -n dnode1 -s start -sql insert into t1 values(1648791223003,5,2,3,1.1); +sql insert into t1 values(1648791223004,5,2,3,1.1); loop20: sleep 1000 diff --git a/tests/script/tsim/stream/checkpoint1.sim b/tests/script/tsim/stream/checkpointInterval1.sim similarity index 97% rename from tests/script/tsim/stream/checkpoint1.sim rename to tests/script/tsim/stream/checkpointInterval1.sim index a339a1ad35..21825e7f48 100644 --- a/tests/script/tsim/stream/checkpoint1.sim +++ b/tests/script/tsim/stream/checkpointInterval1.sim @@ -58,6 +58,7 @@ system sh/stop_dnodes.sh system sh/exec.sh -n dnode1 -s start sql insert into t1 values(1648791213002,3,2,3,1.1); +sql insert into t2 values(1648791223003,4,2,3,1.1); $loop_count = 0 diff --git a/tests/script/tsim/stream/checkpointSession0.sim b/tests/script/tsim/stream/checkpointSession0.sim new file mode 100644 index 0000000000..1d503806c5 --- /dev/null +++ b/tests/script/tsim/stream/checkpointSession0.sim @@ -0,0 +1,178 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step 1 + +print =============== create database +sql create database test vgroups 1; + +sql use test; + + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 session(ts, 10s); +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791213001,2,2,3,1.1); + +$loop_count = 0 + +loop0: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop0 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 3 then + print =====data02=$data02 + goto loop0 +endi + +print waiting for checkpoint generation 1 ...... + +sleep 25000 + +print restart taosd 01 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791213002,3,2,3,1.1); + +$loop_count = 0 + +loop1: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop1 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop1 +endi + +sql insert into t1 values(1648791233003,4,2,3,1.1); + +$loop_count = 0 + +loop2: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop2 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop2 +endi + +# row 1 +if $data11 != 1 then + print =====data11=$data11 + goto loop2 +endi + +if $data12 != 4 then + print =====data12=$data12 + goto loop2 +endi + +print step 2 + +print restart taosd 02 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791233004,5,2,3,1.1); + +loop20: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop20 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop20 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop20 +endi + +# row 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop20 +endi + +if $data12 != 9 then + print =====data12=$data12 + goto loop20 +endi + +print end--------------------------------- + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/stream/checkpointSession1.sim b/tests/script/tsim/stream/checkpointSession1.sim new file mode 100644 index 0000000000..5c9625aabb --- /dev/null +++ b/tests/script/tsim/stream/checkpointSession1.sim @@ -0,0 +1,104 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step 1 + +sql create database test vgroups 4; + +sql use test; + +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st session(ts, 10s); + +sql insert into t1 values(1648791213000,1,2,3,1.0); + +sql insert into t2 values(1648791213001,2,2,3,1.1); + +$loop_count = 0 + +loop0: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop0 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 3 then + print =====data02=$data02 + goto loop0 +endi + +print waiting for checkpoint generation 1 ...... + +sleep 25000 + +print restart taosd + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791213002,3,2,3,1.1); +sql insert into t2 values(1648791233003,4,2,3,1.1); + +$loop_count = 0 + +loop1: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop1 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop1 +endi + +# row 1 +if $data11 != 1 then + print =====data11=$data11 + goto loop1 +endi + +if $data12 != 4 then + print =====data12=$data12 + goto loop1 +endi + +print end--------------------------------- + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/win-test-file b/tests/script/win-test-file index d394ce6876..eb94f32599 100644 --- a/tests/script/win-test-file +++ b/tests/script/win-test-file @@ -240,6 +240,7 @@ ./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/basic3.sim ./test.sh -f tsim/stream/basic4.sim +./test.sh -f tsim/stream/checkpointInterval0.sim ./test.sh -f tsim/stream/checkStreamSTable1.sim ./test.sh -f tsim/stream/checkStreamSTable.sim ./test.sh -f tsim/stream/deleteInterval.sim