opt max delay

This commit is contained in:
liuyao 2023-10-25 20:29:25 +08:00
parent 533711a7f2
commit d0e4a5d0f7
4 changed files with 61 additions and 1 deletions

View File

@ -119,6 +119,7 @@ typedef struct SRowBuffPos {
bool beFlushed;
bool beUsed;
bool needFree;
bool beUpdated;
} SRowBuffPos;
// tq

View File

@ -279,7 +279,12 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) {
SWinKey* pKey = tSimpleHashGetKey(pIte, NULL);
uint64_t groupId = pKey->groupId;
TSKEY ts = pKey->ts;
int32_t code = saveWinResultInfo(ts, groupId, *(SRowBuffPos**)pIte, resWins);
SRowBuffPos* pPos = *(SRowBuffPos**)pIte;
if (!pPos->beUpdated) {
continue;
}
pPos->beUpdated = false;
int32_t code = saveWinResultInfo(ts, groupId, pPos, resWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -863,6 +868,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
pResPos->beUpdated = true;
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES);
}

View File

@ -407,6 +407,7 @@ SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
newPos->beUsed = true;
newPos->beFlushed = false;
newPos->needFree = false;
newPos->beUpdated = true;
return newPos;
}

View File

@ -134,6 +134,58 @@ if $rows != 2 then
goto loop1
endi
print max delay 2s
sql create database test3 vgroups 4;
sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream stream13 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 interval(10s);
sleep 1000
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
$loop_count = 0
loop2:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamt13;
if $rows != 2 then
print ======rows=$rows
goto loop2
endi
$now02 = $data02
$now12 = $data12
$loop_count = 0
print max delay 2s......... sleep 5s
sleep 5000
sql select * from streamt13;
if $data02 != $now02 then
print ======data02=$data02
return -1
endi
if $data12 != $now12 then
print ======data12=$data12
return -1
endi
print ======over
system sh/exec.sh -n dnode1 -s stop -x SIGINT