This commit is contained in:
liuyao 2023-07-18 14:51:32 +08:00
parent ad0b0c3821
commit 14e591c802
2 changed files with 86 additions and 11 deletions

View File

@ -2594,8 +2594,6 @@ int32_t doStreamIntervalEncodeOpState(void** buf, int32_t len, SOperatorInfo* pO
while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
tlen += encodeSWinKey(buf, key);
SRowBuffPos* pPos = *(void**)pIte;
tlen += encodeSRowBuffPos(buf, pPos);
}
// 2.twAggSup
@ -2655,10 +2653,10 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
buf = taosDecodeFixedI32(buf, &mapSize);
for (int32_t i = 0; i < mapSize; i++) {
SWinKey key = {0};
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
pPos->pKey = taosMemoryCalloc(1, sizeof(SWinKey));
buf = decodeSWinKey(buf, &key);
buf = decodeSRowBuffPos(buf, pPos);
SRowBuffPos* pPos = NULL;
int32_t resSize = pInfo->aggSup.resultRowSize;
pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize);
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES);
}

View File

@ -14,6 +14,7 @@ sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
sql create stream streams1 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.1);
@ -45,6 +46,23 @@ if $data02 != 3 then
goto loop0
endi
$loop_count = 0
loop01:
sleep 1000
sql select * from streamt1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows expect 1
goto loop01
endi
print waiting for checkpoint generation 1 ......
sleep 25000
@ -126,6 +144,36 @@ if $data12 != 4 then
goto loop2
endi
$loop_count = 0
loop3:
sleep 1000
print select * from streamt1;
sql select * from streamt1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 2
goto loop3
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop3
endi
if $data02 != 6 then
print =====data02=$data02
goto loop3
endi
print step 2
print restart taosd 02 ......
@ -136,7 +184,7 @@ system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791223004,5,2,3,1.1);
loop20:
loop4:
sleep 1000
sql select * from streamt;
@ -148,29 +196,58 @@ endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop20
goto loop4
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop20
goto loop4
endi
if $data02 != 6 then
print =====data02=$data02
goto loop20
goto loop4
endi
# row 1
if $data11 != 2 then
print =====data11=$data11
goto loop20
goto loop4
endi
if $data12 != 9 then
print =====data12=$data12
goto loop20
goto loop4
endi
$loop_count = 0
loop5:
sleep 1000
print select * from streamt1;
sql select * from streamt1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 2
goto loop5
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop5
endi
if $data02 != 6 then
print =====data02=$data02
goto loop5
endi
print end---------------------------------