fix issue
This commit is contained in:
parent
b1d1e2c778
commit
e8945cedd8
|
@ -620,6 +620,7 @@ typedef struct SStreamEventAggOperatorInfo {
|
|||
bool ignoreExpiredDataSaved;
|
||||
SArray* pUpdated;
|
||||
SSHashObj* pSeUpdated;
|
||||
SSHashObj* pAllUpdated;
|
||||
int64_t dataVersion;
|
||||
bool isHistoryOp;
|
||||
SArray* historyWins;
|
||||
|
|
|
@ -58,6 +58,7 @@ void destroyStreamEventOperatorInfo(void* param) {
|
|||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
blockDataDestroy(pInfo->pDelRes);
|
||||
tSimpleHashCleanup(pInfo->pSeUpdated);
|
||||
tSimpleHashCleanup(pInfo->pAllUpdated);
|
||||
tSimpleHashCleanup(pInfo->pSeDeleted);
|
||||
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||
|
@ -326,6 +327,10 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
compactEventWindow(pOperator, &curWin, pInfo->pSeUpdated, pInfo->pSeDeleted, false);
|
||||
saveSessionOutputBuf(pAggSup, &curWin.winInfo);
|
||||
|
||||
if (pInfo->isHistoryOp) {
|
||||
saveResult(curWin.winInfo, pInfo->pAllUpdated);
|
||||
}
|
||||
|
||||
if (isWindowIncomplete(&curWin)) {
|
||||
continue;
|
||||
}
|
||||
|
@ -527,7 +532,10 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
|
|||
removeSessionDeleteResults(pInfo->pSeDeleted, pInfo->pUpdated);
|
||||
|
||||
if (pInfo->isHistoryOp) {
|
||||
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
||||
SArray* pHisWins = taosArrayInit(16, sizeof(SEventWindowInfo));
|
||||
copyUpdateResult(&pInfo->pAllUpdated, pHisWins, sessionKeyCompareAsc);
|
||||
getMaxTsWins(pHisWins, pInfo->historyWins);
|
||||
taosArrayDestroy(pHisWins);
|
||||
}
|
||||
|
||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||
|
@ -594,16 +602,13 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
|
|||
pSeKeyBuf[i].groupId, i);
|
||||
getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo);
|
||||
setEventWindowFlag(pAggSup, &curInfo);
|
||||
if (!curInfo.pWinFlag->startFlag || curInfo.pWinFlag->endFlag) {
|
||||
continue;
|
||||
}
|
||||
|
||||
compactEventWindow(pOperator, &curInfo, pInfo->pSeUpdated, pInfo->pSeDeleted, false);
|
||||
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey,
|
||||
curInfo.winInfo.sessionWin.groupId);
|
||||
if (isWindowIncomplete(&curInfo)) {
|
||||
if (curInfo.winInfo.isOutput) {
|
||||
ASSERT(0);
|
||||
saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
|
||||
|
@ -694,6 +699,13 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->isHistoryOp = pHandle->fillHistory;
|
||||
}
|
||||
|
||||
if (pInfo->isHistoryOp) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pAllUpdated = tSimpleHashInit(64, hashFn);
|
||||
} else {
|
||||
pInfo->pAllUpdated = NULL;
|
||||
}
|
||||
|
||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
pInfo->reCkBlock = false;
|
||||
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print step1
|
||||
print =============== create database test
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create table t3 using st tags(3,3,3);
|
||||
sql create table t4 using st tags(3,3,3);
|
||||
|
||||
sql insert into t1 values(1648791223000,0,1,1,1.0);
|
||||
sql insert into t1 values(1648791233000,0,2,2,2.0);
|
||||
sql insert into t1 values(1648791243000,1,3,3,3.0);
|
||||
|
||||
sql insert into t2 values(1648791223000,0,1,4,3.0);
|
||||
sql insert into t2 values(1648791233000,0,2,5,1.0);
|
||||
sql insert into t2 values(1648791243000,1,3,6,2.0);
|
||||
|
||||
sql insert into t3 values(1648791223000,1,1,7,3.0);
|
||||
sql insert into t3 values(1648791233000,1,2,8,1.0);
|
||||
sql insert into t3 values(1648791243000,1,3,9,2.0);
|
||||
|
||||
sql insert into t4 values(1648791223000,1,1,10,3.0);
|
||||
sql insert into t4 values(1648791233000,0,2,11,1.0);
|
||||
sql insert into t4 values(1648791243000,1,9,12,2.0);
|
||||
|
||||
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 fill_history 1 into streamt0 as select _wstart as s, count(*) c1, sum(b), max(c), _wend as e from st partition by tbname event_window start with a = 0 end with b = 9;
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791253000,1,9,13,2.0);
|
||||
sql insert into t2 values(1648791253000,1,9,14,2.0);
|
||||
sql insert into t3 values(1648791253000,1,9,15,2.0);
|
||||
sql insert into t4 values(1648791253000,1,9,16,2.0);
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 300
|
||||
print 1 sql select * from streamt0 order by 1, 2, 3, 4;
|
||||
sql select * from streamt0 order by 1, 2, 3, 4;
|
||||
|
||||
print
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 3 then
|
||||
print ======rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data01 != 4 then
|
||||
print ======data01=$data01
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data11 != 4 then
|
||||
print ======data11=$data11
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data21 != 2 then
|
||||
print ======data21=$data21
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
|
||||
print event1 end
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,61 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print step1
|
||||
print =============== create database test
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create table t3 using st tags(3,3,3);
|
||||
sql create table t4 using st tags(3,3,3);
|
||||
|
||||
|
||||
sql insert into t4 values(1648791223000,1,1,10,3.0);
|
||||
sql insert into t4 values(1648791233000,0,2,11,1.0);
|
||||
sql insert into t4 values(1648791243000,1,9,12,2.0);
|
||||
|
||||
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 fill_history 1 into streamt0 as select _wstart as s, count(*) c1, sum(b), max(c), _wend as e from st partition by tbname event_window start with a = 0 end with b = 9;
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t4 values(1648791253000,1,9,16,2.0);
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 300
|
||||
print 1 sql select * from streamt0 order by 1, 2, 3, 4;
|
||||
sql select * from streamt0 order by 1, 2, 3, 4;
|
||||
|
||||
print
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print ======rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print ======data01=$data01
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
|
||||
|
||||
print event1 end
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue