event window and partition by
This commit is contained in:
parent
330b04a8bc
commit
52083cd694
|
@ -108,7 +108,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI
|
||||||
pCurWin->winInfo.sessionWin.win.skey = ts;
|
pCurWin->winInfo.sessionWin.win.skey = ts;
|
||||||
pCurWin->winInfo.sessionWin.win.ekey = ts;
|
pCurWin->winInfo.sessionWin.win.ekey = ts;
|
||||||
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
||||||
SSessionKey leftWinKey = {0};
|
SSessionKey leftWinKey = {.groupId = groupId};
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &leftWinKey, &pVal, &len);
|
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &leftWinKey, &pVal, &len);
|
||||||
|
@ -122,7 +122,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI
|
||||||
}
|
}
|
||||||
pAggSup->stateStore.streamStateFreeCur(pCur);
|
pAggSup->stateStore.streamStateFreeCur(pCur);
|
||||||
pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
||||||
SSessionKey rightWinKey = {0};
|
SSessionKey rightWinKey = {.groupId = groupId};
|
||||||
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &rightWinKey, &pVal, &len);
|
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &rightWinKey, &pVal, &len);
|
||||||
bool inWin = isInTimeWindow(&rightWinKey.win, ts, 0);
|
bool inWin = isInTimeWindow(&rightWinKey.win, ts, 0);
|
||||||
if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && (inWin || (start && !end))) {
|
if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && (inWin || (start && !end))) {
|
||||||
|
|
|
@ -230,5 +230,61 @@ if $data01 != 2 then
|
||||||
goto loop6
|
goto loop6
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step3
|
||||||
|
print =============== create database test3
|
||||||
|
sql create database test3 vgroups 1;
|
||||||
|
sql use test3;
|
||||||
|
|
||||||
|
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
|
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname event_window start with a = 0 end with b = 9;
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791223000,0,3,3,3.0);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791233000,0,1,1,1.0);
|
||||||
|
sql insert into t1 values(1648791243000,1,9,2,2.0);
|
||||||
|
|
||||||
|
sql insert into t2 values(1648791223000,0,3,3,3.0);
|
||||||
|
|
||||||
|
sql insert into t2 values(1648791233000,0,1,1,1.0);
|
||||||
|
sql insert into t2 values(1648791243000,1,9,2,2.0);
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop7:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 1 sql select * from streamt3;
|
||||||
|
sql select * from streamt3;
|
||||||
|
|
||||||
|
print
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
print $data20 $data21 $data22 $data23
|
||||||
|
print
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 3 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 3 then
|
||||||
|
print ======data11=$data11
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
print event1 end
|
print event1 end
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue