From d45adeaa4e4387fc04a35fda5051ea565cb42aa9 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 17 Jul 2023 18:04:17 +0800 Subject: [PATCH] add checksum --- source/libs/executor/src/timewindowoperator.c | 122 +++++++++--- tests/script/tsim/stream/checkpointState0.sim | 178 ++++++++++++++++++ 2 files changed, 277 insertions(+), 23 deletions(-) create mode 100644 tests/script/tsim/stream/checkpointState0.sim diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d0f85b71ac..b436362e1c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -18,6 +18,7 @@ #include "functionMgt.h" #include "operator.h" #include "querytask.h" +#include "tchecksum.h" #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" @@ -2567,12 +2568,14 @@ void* decodeSPullWindowInfoArray(void *buf, SArray* pPullInfos) { return buf; } -int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) { +int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; if (!pInfo) { return 0; } + void* pData = (buf == NULL) ? NULL : *buf; + // 1.pResultRowHashTable int32_t tlen = 0; int32_t mapSize = tSimpleHashGetSize(pInfo->aggSup.pResultRowHashTable); @@ -2613,15 +2616,32 @@ int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) { // 5.dataVersion tlen += taosEncodeFixedI64(buf, pInfo->dataVersion); + // 6.checksum + if (buf) { + uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t)); + tlen += taosEncodeFixedU32(buf, cksum); + } else { + tlen += sizeof(uint32_t); + } + return tlen; } -void doStreamIntervalDecodeOpState(void* buf, SOperatorInfo* pOperator) { +void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; if (!pInfo) { return ; } + // 6.checksum + int32_t dataLen = len - sizeof(uint32_t); + void* pCksum = POINTER_SHIFT(buf, dataLen); + if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { + ASSERT(0); // debug + qError("stream interval state is invalid"); + return; + } + // 1.pResultRowHashTable int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); @@ -2662,10 +2682,10 @@ void doStreamIntervalDecodeOpState(void* buf, SOperatorInfo* pOperator) { void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - int32_t len = doStreamIntervalEncodeOpState(NULL, pOperator); + int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator); void* buf = taosMemoryCalloc(1, len); void* pBuf = buf; - len = doStreamIntervalEncodeOpState(&pBuf, pOperator); + len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); taosMemoryFree(buf); @@ -2816,8 +2836,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; } else if (pBlock->info.type == STREAM_CHECKPOINT) { - doStreamIntervalSaveCheckpoint(pOperator); pAPI->stateStore.streamStateCommit(pInfo->pState); + doStreamIntervalSaveCheckpoint(pOperator); copyDataBlock(pInfo->pCheckpointRes, pBlock); continue; } else { @@ -3075,7 +3095,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, int32_t len = 0; int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); if (res == TSDB_CODE_SUCCESS) { - doStreamIntervalDecodeOpState(buff, pOperator); + doStreamIntervalDecodeOpState(buff, len, pOperator); taosMemoryFree(buff); } @@ -3794,12 +3814,14 @@ void* decodeSResultWindowInfo(void *buf, SResultWindowInfo* key, int32_t outLen) return buf; } -int32_t doStreamSessionEncodeOpState(void **buf, SOperatorInfo* pOperator) { +int32_t doStreamSessionEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; if (!pInfo) { return 0; } + void* pData = (buf == NULL) ? NULL : *buf; + // 1.streamAggSup.pResultRows int32_t tlen = 0; int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); @@ -3821,21 +3843,42 @@ int32_t doStreamSessionEncodeOpState(void **buf, SOperatorInfo* pOperator) { tlen += taosEncodeFixedI32(buf, size); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); - tlen += doStreamSessionEncodeOpState(buf, pChOp); + tlen += doStreamSessionEncodeOpState(buf, 0, pChOp, false); } // 4.dataVersion tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + // 5.checksum + if (isParent) { + if (buf) { + uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t)); + tlen += taosEncodeFixedU32(buf, cksum); + } else { + tlen += sizeof(uint32_t); + } + } + return tlen; } -void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) { +void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; if (!pInfo) { return buf; } + // 5.checksum + if (isParent) { + int32_t dataLen = len - sizeof(uint32_t); + void* pCksum = POINTER_SHIFT(buf, dataLen); + if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { + ASSERT(0); // debug + qError("stream interval state is invalid"); + return buf; + } + } + // 1.streamAggSup.pResultRows int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); @@ -3856,7 +3899,7 @@ void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) { ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); - buf = doStreamSessionDecodeOpState(buf, pChOp); + buf = doStreamSessionDecodeOpState(buf, 0, pChOp, false); } // 4.dataVersion @@ -3866,10 +3909,10 @@ void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) { void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - int32_t len = doStreamSessionEncodeOpState(NULL, pOperator); + int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true); void* buf = taosMemoryCalloc(1, len); void* pBuf = buf; - len = doStreamSessionEncodeOpState(&pBuf, pOperator); + len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); } @@ -3941,8 +3984,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; } else if (pBlock->info.type == STREAM_CHECKPOINT) { - doStreamSessionSaveCheckpoint(pOperator); pAggSup->stateStore.streamStateCommit(pAggSup->pState); + doStreamSessionSaveCheckpoint(pOperator); copyDataBlock(pInfo->pCheckpointRes, pBlock); continue; } else { @@ -4141,7 +4184,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh int32_t len = 0; int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); if (res == TSDB_CODE_SUCCESS) { - doStreamSessionDecodeOpState(buff, pOperator); + doStreamSessionDecodeOpState(buff, len, pOperator, true); taosMemoryFree(buff); } @@ -4242,8 +4285,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; } else if (pBlock->info.type == STREAM_CHECKPOINT) { - doStreamSessionSaveCheckpoint(pOperator); pAggSup->stateStore.streamStateCommit(pAggSup->pState); + doStreamSessionSaveCheckpoint(pOperator); continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); @@ -4541,12 +4584,14 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } } -int32_t doStreamStateEncodeOpState(void **buf, SOperatorInfo* pOperator) { +int32_t doStreamStateEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; if (!pInfo) { return 0; } + void* pData = (buf == NULL) ? NULL : *buf; + // 1.streamAggSup.pResultRows int32_t tlen = 0; int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); @@ -4568,21 +4613,42 @@ int32_t doStreamStateEncodeOpState(void **buf, SOperatorInfo* pOperator) { tlen += taosEncodeFixedI32(buf, size); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); - tlen += doStreamSessionEncodeOpState(buf, pChOp); + tlen += doStreamStateEncodeOpState(buf, 0, pChOp, false); } // 4.dataVersion tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + // 5.checksum + if (isParent) { + if (buf) { + uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t)); + tlen += taosEncodeFixedU32(buf, cksum); + } else { + tlen += sizeof(uint32_t); + } + } + return tlen; } -void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) { +void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; if (!pInfo) { return buf; } + // 5.checksum + if (isParent) { + int32_t dataLen = len - sizeof(uint32_t); + void* pCksum = POINTER_SHIFT(buf, dataLen); + if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { + ASSERT(0); // debug + qError("stream interval state is invalid"); + return buf; + } + } + // 1.streamAggSup.pResultRows int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); @@ -4603,7 +4669,7 @@ void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) { ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); - buf = doStreamStateDecodeOpState(buf, pChOp); + buf = doStreamStateDecodeOpState(buf, 0, pChOp, false); } // 4.dataVersion @@ -4611,6 +4677,16 @@ void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) { return buf; } +void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, + strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len); +} + static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -4665,8 +4741,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; } else if (pBlock->info.type == STREAM_CHECKPOINT) { - doStreamSessionSaveCheckpoint(pOperator); pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); + doStreamSessionSaveCheckpoint(pOperator); copyDataBlock(pInfo->pCheckpointRes, pBlock); continue; } else { @@ -4861,7 +4937,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys int32_t len = 0; int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len); if (res == TSDB_CODE_SUCCESS) { - doStreamStateDecodeOpState(buff, pOperator); + doStreamStateDecodeOpState(buff, len, pOperator, true); taosMemoryFree(buff); } @@ -5530,8 +5606,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pBlock, "single interval"); return pBlock; } else if (pBlock->info.type == STREAM_CHECKPOINT) { - doStreamIntervalSaveCheckpoint(pOperator); pAPI->stateStore.streamStateCommit(pInfo->pState); + doStreamIntervalSaveCheckpoint(pOperator); pInfo->reCkBlock = true; copyDataBlock(pInfo->pCheckpointRes, pBlock); continue; @@ -5714,7 +5790,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys int32_t len = 0; int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); if (res == TSDB_CODE_SUCCESS) { - doStreamIntervalDecodeOpState(buff, pOperator); + doStreamIntervalDecodeOpState(buff, len, pOperator); taosMemoryFree(buff); } diff --git a/tests/script/tsim/stream/checkpointState0.sim b/tests/script/tsim/stream/checkpointState0.sim new file mode 100644 index 0000000000..3836721212 --- /dev/null +++ b/tests/script/tsim/stream/checkpointState0.sim @@ -0,0 +1,178 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step 1 + +print =============== create database +sql create database test vgroups 1; + +sql use test; + + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 state_window(b); +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791213001,2,2,3,1.1); + +$loop_count = 0 + +loop0: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop0 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 3 then + print =====data02=$data02 + goto loop0 +endi + +print waiting for checkpoint generation 1 ...... + +sleep 25000 + +print restart taosd 01 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791213002,3,2,3,1.1); + +$loop_count = 0 + +loop1: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop1 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop1 +endi + +sql insert into t1 values(1648791233003,4,3,3,1.1); + +$loop_count = 0 + +loop2: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop2 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop2 +endi + +# row 1 +if $data11 != 1 then + print =====data11=$data11 + goto loop2 +endi + +if $data12 != 4 then + print =====data12=$data12 + goto loop2 +endi + +print step 2 + +print restart taosd 02 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791233004,5,3,3,1.1); + +loop20: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop20 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop20 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop20 +endi + +# row 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop20 +endi + +if $data12 != 9 then + print =====data12=$data12 + goto loop20 +endi + +print end--------------------------------- + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file