commit
6478e167af
|
@ -349,6 +349,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
curWin.winInfo.pStatePos->beUpdated = true;
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
|
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
|
||||||
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||||
|
|
|
@ -2083,6 +2083,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
winInfo.pStatePos->beUpdated = true;
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
getSessionHashKey(&winInfo.sessionWin, &key);
|
getSessionHashKey(&winInfo.sessionWin, &key);
|
||||||
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
||||||
|
@ -2286,6 +2287,10 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||||
SResultWindowInfo* pWinInfo = pIte;
|
SResultWindowInfo* pWinInfo = pIte;
|
||||||
|
if (!pWinInfo->pStatePos->beUpdated) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pWinInfo->pStatePos->beUpdated = false;
|
||||||
saveResult(*pWinInfo, pStUpdated);
|
saveResult(*pWinInfo, pStUpdated);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3425,6 +3430,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
curWin.winInfo.pStatePos->beUpdated = true;
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
|
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
|
||||||
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||||
|
|
|
@ -290,6 +290,213 @@ if $data32 != $now32 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step 2 max delay 2s
|
||||||
|
sql create database test15 vgroups 4;
|
||||||
|
sql use test15;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
|
||||||
|
sql create stream stream15 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791233001,2,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop4:
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
$now02 = $data02
|
||||||
|
$now12 = $data12
|
||||||
|
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print session max delay over
|
||||||
|
|
||||||
|
print step 3 max delay 2s
|
||||||
|
sql create database test16 vgroups 4;
|
||||||
|
sql use test16;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
|
||||||
|
sql create stream stream16 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 state_window(a);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791233001,2,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop5:
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
$now02 = $data02
|
||||||
|
$now12 = $data12
|
||||||
|
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print state max delay over
|
||||||
|
|
||||||
|
print step 4 max delay 2s
|
||||||
|
sql create database test17 vgroups 4;
|
||||||
|
sql use test17;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
|
||||||
|
sql create stream stream17 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 1 end with a = 9;
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213001,9,2,3,1.0);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791233001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791233009,9,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop6:
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
$now02 = $data02
|
||||||
|
$now12 = $data12
|
||||||
|
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print event max delay over
|
||||||
|
|
||||||
print ======over
|
print ======over
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue