From 9368244f43f4eb3bf6aca06c68f00fc40853e53f Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 4 Sep 2024 15:34:14 +0800 Subject: [PATCH] fix issue for fill history --- source/libs/stream/src/streamSliceState.c | 10 +- source/libs/stream/src/tstreamFileState.c | 1 + .../tsim/stream/streamInterpHistory.sim | 163 ++++++++++++++++++ 3 files changed, 172 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index c4d550e6f4..6f4eea06ee 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -70,9 +70,9 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo } int32_t size = taosArrayGetSize(pWinStates); - if (!isFlushedState(pFileState, pKey->ts, 0)) { + int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); + if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0) { // find the first position which is smaller than the pKey - int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); if (index >= 0) { SWinKey* pTmpKey = taosArrayGet(pWinStates, index); if (winKeyCmprImpl(pTmpKey, pKey) == 0) { @@ -277,6 +277,12 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW return code; } else { SWinKey* pNext = taosArrayGet(pWinStates, index - 1); + if (qDebugFlag & DEBUG_DEBUG) { + SWinKey* pTmp = taosArrayGet(pWinStates, index); + if (winKeyCmprImpl(pTmp, pKey) != 0) { + qError("%s failed at line %d since do not find cur SWinKey", __func__, lino); + } + } *pResKey = *pNext; return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 60be901ac9..529ff78cae 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -1033,6 +1033,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi (*pos)->beUsed = true; (*pos)->beFlushed = false; (*pWinCode) = TSDB_CODE_SUCCESS; + goto _end; } TSKEY ts = pFileState->getTs(pKey); if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) { diff --git a/tests/script/tsim/stream/streamInterpHistory.sim b/tests/script/tsim/stream/streamInterpHistory.sim index beb81ec04b..b9685ebf05 100644 --- a/tests/script/tsim/stream/streamInterpHistory.sim +++ b/tests/script/tsim/stream/streamInterpHistory.sim @@ -489,4 +489,167 @@ if $rows != 8 then print ======rows=$rows goto loop3_1 endi + +print step3 + +print =============== create database +sql create database test3 vgroups 1; +sql use test3; + +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql insert into t1 values(1648791212000,1,1,1,1.0); +sql insert into t1 values(1648791215001,2,1,1,1.0); + +sql insert into t2 values(1648791212000,31,1,1,1.0); +sql insert into t2 values(1648791216001,41,1,1,1.0); + +sql create stream streams3 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791217000,5,1,1,1.0); +sql insert into t2 values(1648791217000,61,1,1,1.0); + +print sql select _irowts, _isfilled as a1, interp(a) as a2 from t1 partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev) order by 3, 1; +sql select _irowts, _isfilled as a1, interp(a) as a2 from t1 partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev) order by 3, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + + +$loop_count = 0 + +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 0 sql select * from streamt where a2 <= 10 order by 1; +sql select * from streamt where a2 < 10 order by 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +# row 0 +if $rows != 6 then + print ======rows=$rows + goto loop4 +endi + +# row 0 +if $data02 != 1 then + print ======data02=$data02 + goto loop4 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop4 +endi + +if $data22 != 1 then + print ======data22=$data22 + goto loop4 +endi + +if $data32 != 1 then + print ======data32=$data32 + goto loop4 +endi + +if $data42 != 2 then + print ======data42=$data42 + goto loop4 +endi + +if $data52 != 5 then + print ======data52=$data52 + goto loop4 +endi + + +print sql select _irowts, _isfilled as a1, interp(a) as a2 from t2 partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev) order by 3, 1; +sql select _irowts, _isfilled as a1, interp(a) as a2 from t2 partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev) order by 3, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +$loop_count = 0 + +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 0 sql select * from streamt where a2 > 10 order by 1; +sql select * from streamt where a2 > 10 order by 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +# row 0 +if $rows != 6 then + print ======rows=$rows + goto loop5 +endi + +if $data02 != 31 then + print ======data02=$data02 + goto loop5 +endi + +if $data12 != 31 then + print ======data12=$data12 + goto loop5 +endi + +if $data22 != 31 then + print ======data22=$data22 + goto loop5 +endi + +if $data32 != 31 then + print ======data32=$data32 + goto loop5 +endi + +if $data42 != 31 then + print ======data42=$data42 + goto loop5 +endi + +if $data52 != 61 then + print ======data52=$data52 + goto loop5 +endi + +print end + system sh/exec.sh -n dnode1 -s stop -x SIGINT