diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index eac4f87b14..095d7e1a2b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3410,6 +3410,8 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t* QUERY_CHECK_CODE(code, lino, _end); } + qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey); + *pLen = len; _end: @@ -3475,21 +3477,20 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) goto _end; } - if (pInfo->pUpdateInfo != NULL) { - void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); - if (!pUpInfo) { - lino = __LINE__; - goto _end; - } - code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); - if (code == TSDB_CODE_SUCCESS) { - pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); - pInfo->pUpdateInfo = pUpInfo; - } else { - taosMemoryFree(pUpInfo); - lino = __LINE__; - goto _end; - } + void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); + if (!pUpInfo) { + lino = __LINE__; + goto _end; + } + code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); + if (code == TSDB_CODE_SUCCESS) { + pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); + pInfo->pUpdateInfo = pUpInfo; + qDebug("%s line:%d. stream scan updateinfo deserialize success", __func__, __LINE__); + } else { + taosMemoryFree(pUpInfo); + code = TSDB_CODE_SUCCESS; + qDebug("%s line:%d. stream scan did not have updateinfo", __func__, __LINE__); } if (tDecodeIsEnd(pDeCoder)) { @@ -3509,6 +3510,7 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) lino = __LINE__; goto _end; } + qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey); _end: if (pDeCoder != NULL) { diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index a3cfa00127..49d5041369 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -445,6 +445,11 @@ int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (!pInfo) { + if (tEncodeI32(pEncoder, -1) < 0) { + code = TSDB_CODE_FAILED; + QUERY_CHECK_CODE(code, lino, _end); + } + uDebug("%s line:%d. it did not have updateinfo", __func__, __LINE__); return TSDB_CODE_SUCCESS; } @@ -550,6 +555,10 @@ int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo) { int32_t size = 0; if (tDecodeI32(pDeCoder, &size) < 0) return -1; + + if (size < 0) { + return -1; + } pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno); diff --git a/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim b/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim new file mode 100644 index 0000000000..ed72d87e9a --- /dev/null +++ b/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim @@ -0,0 +1,67 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 4; +sql use test; + +sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a) from st partition by tbname interval(2s); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a) from st interval(2s); + +run tsim/stream/checkTaskStatus.sim + +sleep 70000 + + +print restart taosd 01 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3000a,1,1,1); + +$loop_count = 0 +loop0: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print select * from streamt1; +sql select * from streamt1; + +print $data00 $data01 $data02 + +if $rows == 0 then + goto loop0 +endi + +print select * from streamt2; +sql select * from streamt2; + +print $data00 $data01 $data02 + +if $rows == 0 then + goto loop0 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT