commit
e46a13c788
|
@ -119,6 +119,7 @@ typedef struct SRowBuffPos {
|
|||
bool beFlushed;
|
||||
bool beUsed;
|
||||
bool needFree;
|
||||
bool beUpdated;
|
||||
} SRowBuffPos;
|
||||
|
||||
// tq
|
||||
|
|
|
@ -279,7 +279,12 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) {
|
|||
SWinKey* pKey = tSimpleHashGetKey(pIte, NULL);
|
||||
uint64_t groupId = pKey->groupId;
|
||||
TSKEY ts = pKey->ts;
|
||||
int32_t code = saveWinResultInfo(ts, groupId, *(SRowBuffPos**)pIte, resWins);
|
||||
SRowBuffPos* pPos = *(SRowBuffPos**)pIte;
|
||||
if (!pPos->beUpdated) {
|
||||
continue;
|
||||
}
|
||||
pPos->beUpdated = false;
|
||||
int32_t code = saveWinResultInfo(ts, groupId, pPos, resWins);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -866,6 +871,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
|
|||
}
|
||||
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
pResPos->beUpdated = true;
|
||||
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES);
|
||||
}
|
||||
|
||||
|
|
|
@ -428,6 +428,7 @@ SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
|
|||
newPos->beUsed = true;
|
||||
newPos->beFlushed = false;
|
||||
newPos->needFree = false;
|
||||
newPos->beUpdated = true;
|
||||
return newPos;
|
||||
}
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ endi
|
|||
sql select count(*) from streamt;
|
||||
|
||||
if $data00 != 9098 then
|
||||
print =====rows=$rows
|
||||
print =====data00=$data00
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
|
|
|
@ -134,6 +134,162 @@ if $rows != 2 then
|
|||
goto loop1
|
||||
endi
|
||||
|
||||
print step 1 max delay 2s
|
||||
sql create database test3 vgroups 4;
|
||||
sql use test3;
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
|
||||
sql create stream stream13 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 interval(10s);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223001,2,2,3,1.1);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop2:
|
||||
|
||||
sleep 1000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt13;
|
||||
|
||||
if $rows != 2 then
|
||||
print ======rows=$rows
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
$now02 = $data02
|
||||
$now12 = $data12
|
||||
|
||||
|
||||
print step1 max delay 2s......... sleep 3s
|
||||
sleep 3000
|
||||
|
||||
sql select * from streamt13;
|
||||
|
||||
|
||||
if $data02 != $now02 then
|
||||
print ======data02=$data02
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != $now12 then
|
||||
print ======data12=$data12
|
||||
return -1
|
||||
endi
|
||||
|
||||
print step 2 max delay 2s
|
||||
|
||||
sql create database test4 vgroups 4;
|
||||
sql use test4;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream stream14 trigger max_delay 2s into streamt14 as select _wstart, sum(a), now from st partition by tbname interval(10s);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223000,2,2,3,1.1);
|
||||
|
||||
sql insert into t2 values(1648791213000,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791223000,4,2,3,1.1);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop3:
|
||||
|
||||
sleep 1000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt14 order by 2;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
print $data20 $data21 $data22
|
||||
print $data30 $data31 $data32
|
||||
|
||||
if $rows != 4 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
$now02 = $data02
|
||||
$now12 = $data12
|
||||
$now22 = $data22
|
||||
$now32 = $data32
|
||||
|
||||
print step2 max delay 2s......... sleep 3s
|
||||
sleep 3000
|
||||
|
||||
sql select * from streamt14 order by 2;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
print $data20 $data21 $data22
|
||||
print $data30 $data31 $data32
|
||||
|
||||
|
||||
if $data02 != $now02 then
|
||||
print ======data02=$data02
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != $now12 then
|
||||
print ======data12=$data12
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data22 != $now22 then
|
||||
print ======data22=$data22
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data32 != $now32 then
|
||||
print ======data32=$data32
|
||||
return -1
|
||||
endi
|
||||
|
||||
print step2 max delay 2s......... sleep 3s
|
||||
sleep 3000
|
||||
|
||||
sql select * from streamt14 order by 2;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
print $data20 $data21 $data22
|
||||
print $data30 $data31 $data32
|
||||
|
||||
|
||||
if $data02 != $now02 then
|
||||
print ======data02=$data02
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != $now12 then
|
||||
print ======data12=$data12
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data22 != $now22 then
|
||||
print ======data22=$data22
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data32 != $now32 then
|
||||
print ======data32=$data32
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ======over
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue