From 53ae0143544ecc506ee330e494417e47f55d05c7 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 20 Nov 2024 19:13:05 +0800 Subject: [PATCH 1/3] fix issue for streamscan --- source/libs/executor/src/scanoperator.c | 31 ++++----- source/libs/stream/src/streamUpdate.c | 5 ++ .../stream/streamFwcIntervalCheckpoint.sim | 67 +++++++++++++++++++ 3 files changed, 88 insertions(+), 15 deletions(-) create mode 100644 tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index eac4f87b14..7822aa9711 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3410,6 +3410,8 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t* QUERY_CHECK_CODE(code, lino, _end); } + qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey); + *pLen = len; _end: @@ -3475,21 +3477,19 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) goto _end; } - if (pInfo->pUpdateInfo != NULL) { - void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); - if (!pUpInfo) { - lino = __LINE__; - goto _end; - } - code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); - if (code == TSDB_CODE_SUCCESS) { - pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); - pInfo->pUpdateInfo = pUpInfo; - } else { - taosMemoryFree(pUpInfo); - lino = __LINE__; - goto _end; - } + void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); + if (!pUpInfo) { + lino = __LINE__; + goto _end; + } + code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); + if (code == TSDB_CODE_SUCCESS) { + pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); + pInfo->pUpdateInfo = pUpInfo; + } else { + taosMemoryFree(pUpInfo); + lino = __LINE__; + goto _end; } if (tDecodeIsEnd(pDeCoder)) { @@ -3509,6 +3509,7 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) lino = __LINE__; goto _end; } + qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey); _end: if (pDeCoder != NULL) { diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index a3cfa00127..1f1e4c8fbc 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -445,6 +445,7 @@ int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (!pInfo) { + tEncodeI32(pEncoder, -1); return TSDB_CODE_SUCCESS; } @@ -550,6 +551,10 @@ int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo) { int32_t size = 0; if (tDecodeI32(pDeCoder, &size) < 0) return -1; + + if (size < 0) { + return -1; + } pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno); diff --git a/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim b/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim new file mode 100644 index 0000000000..ed72d87e9a --- /dev/null +++ b/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim @@ -0,0 +1,67 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 4; +sql use test; + +sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a) from st partition by tbname interval(2s); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a) from st interval(2s); + +run tsim/stream/checkTaskStatus.sim + +sleep 70000 + + +print restart taosd 01 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3000a,1,1,1); + +$loop_count = 0 +loop0: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print select * from streamt1; +sql select * from streamt1; + +print $data00 $data01 $data02 + +if $rows == 0 then + goto loop0 +endi + +print select * from streamt2; +sql select * from streamt2; + +print $data00 $data01 $data02 + +if $rows == 0 then + goto loop0 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From 5ace9b1d90e8023bcc14aac69391f558eb2979a1 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 20 Nov 2024 19:33:46 +0800 Subject: [PATCH 2/3] fix issue --- source/libs/stream/src/streamUpdate.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 1f1e4c8fbc..ff11850d7c 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -445,7 +445,10 @@ int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (!pInfo) { - tEncodeI32(pEncoder, -1); + if (tEncodeI32(pEncoder, -1) < 0) { + code = TSDB_CODE_FAILED; + QUERY_CHECK_CODE(code, lino, _end); + } return TSDB_CODE_SUCCESS; } From 6464077cec68505240e758c07e9783f96a35b39a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 21 Nov 2024 09:50:45 +0800 Subject: [PATCH 3/3] add log --- source/libs/executor/src/scanoperator.c | 5 +++-- source/libs/stream/src/streamUpdate.c | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7822aa9711..095d7e1a2b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3486,10 +3486,11 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) if (code == TSDB_CODE_SUCCESS) { pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); pInfo->pUpdateInfo = pUpInfo; + qDebug("%s line:%d. stream scan updateinfo deserialize success", __func__, __LINE__); } else { taosMemoryFree(pUpInfo); - lino = __LINE__; - goto _end; + code = TSDB_CODE_SUCCESS; + qDebug("%s line:%d. stream scan did not have updateinfo", __func__, __LINE__); } if (tDecodeIsEnd(pDeCoder)) { diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index ff11850d7c..49d5041369 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -449,6 +449,7 @@ int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } + uDebug("%s line:%d. it did not have updateinfo", __func__, __LINE__); return TSDB_CODE_SUCCESS; }