diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index cfebddf9a1..c1fedfb906 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -757,6 +757,8 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key, (void**)&pPrevPoint->pResPos, &preVLen, &tmpRes); QUERY_CHECK_CODE(code, lino, _end); + qDebug("===stream=== set stream interp resutl prev buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d", pPrevPoint->key.ts, pPrevPoint->key.groupId, tmpRes); + if (tmpRes == TSDB_CODE_SUCCESS) { QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); setPointBuff(pPrevPoint, pFillSup); diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index e535c2e7a3..c92fac5968 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -233,10 +233,11 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW SArray* pWinStates = NULL; SSHashObj* pSearchBuff = getSearchBuff(pFileState); void* pState = getStateFileStore(pFileState); - void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); + void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); if (ppBuff) { pWinStates = (SArray*)(*ppBuff); } else { + qTrace("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId); SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); void* tmpVal = NULL; int32_t len = 0; @@ -257,7 +258,15 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW } int32_t size = taosArrayGetSize(pWinStates); int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); - if (index == -1 || index == 0) { + if (index >= 0) { + SWinKey* pCurKey = taosArrayGet(pWinStates, index); + if (winKeyCmprImpl(pCurKey, pKey) == 0) { + index--; + } else { + qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__); + } + } + if (index == -1) { SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); void* tmpVal = NULL; int32_t len = 0; @@ -276,15 +285,7 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW streamStateFreeCur(pCur); return code; } else { - SWinKey* pPrevKey = NULL; - SWinKey* pCurKey = taosArrayGet(pWinStates, index); - if (winKeyCmprImpl(pCurKey, pKey) == 0) { - pPrevKey = taosArrayGet(pWinStates, index - 1); - } else { - pPrevKey = taosArrayGet(pWinStates, index); - qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__); - } - + SWinKey* pPrevKey = taosArrayGet(pWinStates, index); *pResKey = *pPrevKey; return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode); } diff --git a/tests/script/tsim/stream/streamInterpForceWindowClose.sim b/tests/script/tsim/stream/streamInterpForceWindowClose.sim new file mode 100644 index 0000000000..4eb14abf00 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpForceWindowClose.sim @@ -0,0 +1,139 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a) as a, interp(b) as b, now from t1 every(2s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(now,1,1,1,1.1) (now + 10s,2,2,2,2.1) (now + 20s,3,3,3,3.1); + +print sql select * from t1; +sql select * from t1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop0: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where a == 1; +sql select * from streamt where a == 1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop0 +endi + +$loop_count = 0 +loop1: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where a == 2; +sql select * from streamt where a == 2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop1 +endi + +$loop_count = 0 +loop2: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where a == 3; +sql select * from streamt where a == 3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop2 +endi + +sleep 4000 + +$loop_count = 0 +loop3: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where a == 3; +sql select * from streamt where a == 3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 5 then + print ======rows=$rows + goto loop3 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT