update data
This commit is contained in:
parent
5b74bf87b1
commit
b9e05ab1ec
|
@ -115,7 +115,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI
|
||||||
if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &leftWinKey.win) ) {
|
if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &leftWinKey.win) ) {
|
||||||
bool inWin = isInTimeWindow(&leftWinKey.win, ts, 0);
|
bool inWin = isInTimeWindow(&leftWinKey.win, ts, 0);
|
||||||
setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin);
|
setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin);
|
||||||
if(inWin || !pCurWin->pWinFlag->endFlag) {
|
if(inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag) ) {
|
||||||
pCurWin->winInfo.isOutput = true;
|
pCurWin->winInfo.isOutput = true;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
@ -124,7 +124,8 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI
|
||||||
pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
||||||
SSessionKey rightWinKey = {0};
|
SSessionKey rightWinKey = {0};
|
||||||
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &rightWinKey, &pVal, &len);
|
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &rightWinKey, &pVal, &len);
|
||||||
if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && start && !end) {
|
bool inWin = isInTimeWindow(&rightWinKey.win, ts, 0);
|
||||||
|
if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && (inWin || (start && !end))) {
|
||||||
int32_t endi = getEndCondIndex(pEnd, index, rows);
|
int32_t endi = getEndCondIndex(pEnd, index, rows);
|
||||||
if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) {
|
if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) {
|
||||||
setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin);
|
setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin);
|
||||||
|
@ -158,6 +159,10 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW
|
||||||
TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, SSHashObj* pResultRows,
|
TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, SSHashObj* pResultRows,
|
||||||
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) {
|
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) {
|
||||||
*pRebuild = false;
|
*pRebuild = false;
|
||||||
|
if (!pWinInfo->pWinFlag->startFlag) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
TSKEY maxTs = INT64_MAX;
|
TSKEY maxTs = INT64_MAX;
|
||||||
STimeWindow* pWin = &pWinInfo->winInfo.sessionWin.win;
|
STimeWindow* pWin = &pWinInfo->winInfo.sessionWin.win;
|
||||||
if (pWinInfo->pWinFlag->endFlag) {
|
if (pWinInfo->pWinFlag->endFlag) {
|
||||||
|
|
|
@ -15,6 +15,8 @@ sql create table t1 using st tags(1,1,1);
|
||||||
sql create table t2 using st tags(2,2,2);
|
sql create table t2 using st tags(2,2,2);
|
||||||
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st interval(10s);
|
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st interval(10s);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
|
||||||
sql insert into t2 values(1648791213001,2,2,3,1.1);
|
sql insert into t2 values(1648791213001,2,2,3,1.1);
|
||||||
|
|
|
@ -26,8 +26,10 @@ sleep 300
|
||||||
print 1 sql select * from streamt1;
|
print 1 sql select * from streamt1;
|
||||||
sql select * from streamt1;
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
print ===============================
|
||||||
print $data00 $data01 $data02 $data03
|
print $data00 $data01 $data02 $data03
|
||||||
print $data10 $data11 $data12 $data13
|
print $data10 $data11 $data12 $data13
|
||||||
|
print ===============================
|
||||||
|
|
||||||
$loop_count = $loop_count + 1
|
$loop_count = $loop_count + 1
|
||||||
if $loop_count == 10 then
|
if $loop_count == 10 then
|
||||||
|
@ -45,5 +47,80 @@ if $data01 != 2 then
|
||||||
goto loop0
|
goto loop0
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step2
|
||||||
|
print =============== create database test2
|
||||||
|
sql create database test2 vgroups 1;
|
||||||
|
sql use test2;
|
||||||
|
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart as s, count(*) c1, sum(b), max(c) from t1 event_window start with a = 0 end with b = 9;
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791223000,0,3,3,3.0);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791233000,0,1,1,1.0);
|
||||||
|
sql insert into t1 values(1648791243000,1,9,2,2.0);
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop1:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 1 sql select * from streamt2;
|
||||||
|
sql select * from streamt2;
|
||||||
|
|
||||||
|
print ===============================
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
print ===============================
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 3 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791223000,1,1,4,4.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop2:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 1 sql select * from streamt2;
|
||||||
|
sql select * from streamt2;
|
||||||
|
|
||||||
|
print ===============================
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
print ===============================
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 2 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
print event1 end
|
print event1 end
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue