fix issue for fill history

This commit is contained in:
54liuyao 2024-09-04 15:34:14 +08:00
parent f5b6e800f3
commit 9368244f43
3 changed files with 172 additions and 2 deletions

View File

@ -70,9 +70,9 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
}
int32_t size = taosArrayGetSize(pWinStates);
if (!isFlushedState(pFileState, pKey->ts, 0)) {
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0) {
// find the first position which is smaller than the pKey
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (index >= 0) {
SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
@ -277,6 +277,12 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
return code;
} else {
SWinKey* pNext = taosArrayGet(pWinStates, index - 1);
if (qDebugFlag & DEBUG_DEBUG) {
SWinKey* pTmp = taosArrayGet(pWinStates, index);
if (winKeyCmprImpl(pTmp, pKey) != 0) {
qError("%s failed at line %d since do not find cur SWinKey", __func__, lino);
}
}
*pResKey = *pNext;
return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode);
}

View File

@ -1033,6 +1033,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
(*pos)->beUsed = true;
(*pos)->beFlushed = false;
(*pWinCode) = TSDB_CODE_SUCCESS;
goto _end;
}
TSKEY ts = pFileState->getTs(pKey);
if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {

View File

@ -489,4 +489,167 @@ if $rows != 8 then
print ======rows=$rows
goto loop3_1
endi
print step3
print =============== create database
sql create database test3 vgroups 1;
sql use test3;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql insert into t1 values(1648791212000,1,1,1,1.0);
sql insert into t1 values(1648791215001,2,1,1,1.0);
sql insert into t2 values(1648791212000,31,1,1,1.0);
sql insert into t2 values(1648791216001,41,1,1,1.0);
sql create stream streams3 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,5,1,1,1.0);
sql insert into t2 values(1648791217000,61,1,1,1.0);
print sql select _irowts, _isfilled as a1, interp(a) as a2 from t1 partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev) order by 3, 1;
sql select _irowts, _isfilled as a1, interp(a) as a2 from t1 partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev) order by 3, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 0 sql select * from streamt where a2 <= 10 order by 1;
sql select * from streamt where a2 < 10 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop4
endi
# row 0
if $data02 != 1 then
print ======data02=$data02
goto loop4
endi
if $data12 != 1 then
print ======data12=$data12
goto loop4
endi
if $data22 != 1 then
print ======data22=$data22
goto loop4
endi
if $data32 != 1 then
print ======data32=$data32
goto loop4
endi
if $data42 != 2 then
print ======data42=$data42
goto loop4
endi
if $data52 != 5 then
print ======data52=$data52
goto loop4
endi
print sql select _irowts, _isfilled as a1, interp(a) as a2 from t2 partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev) order by 3, 1;
sql select _irowts, _isfilled as a1, interp(a) as a2 from t2 partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev) order by 3, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop5:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 0 sql select * from streamt where a2 > 10 order by 1;
sql select * from streamt where a2 > 10 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop5
endi
if $data02 != 31 then
print ======data02=$data02
goto loop5
endi
if $data12 != 31 then
print ======data12=$data12
goto loop5
endi
if $data22 != 31 then
print ======data22=$data22
goto loop5
endi
if $data32 != 31 then
print ======data32=$data32
goto loop5
endi
if $data42 != 31 then
print ======data42=$data42
goto loop5
endi
if $data52 != 61 then
print ======data52=$data52
goto loop5
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT