Merge pull request #13722 from taosdata/feature/TD-16435
feat(stream): state & session max delay
This commit is contained in:
commit
3be426f913
|
@ -2649,8 +2649,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
|
|||
SResultWindowInfo* getSessionWinInfo(void* pData) { return (SResultWindowInfo*)pData; }
|
||||
SResultWindowInfo* getStateWinInfo(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
|
||||
|
||||
int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pClosed, int8_t calTrigger,
|
||||
__get_win_info_ fn) {
|
||||
int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pClosed,
|
||||
__get_win_info_ fn) {
|
||||
// Todo(liuyao) save window to tdb
|
||||
int32_t size = taosArrayGetSize(pWins);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
|
@ -2658,19 +2658,9 @@ int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pC
|
|||
SResultWindowInfo* pSeWin = fn(pWin);
|
||||
if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) {
|
||||
if (!pSeWin->isClosed) {
|
||||
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
||||
if (pos == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pos->groupId = 0;
|
||||
pos->pos = pSeWin->pos;
|
||||
*(int64_t*)pos->key = pSeWin->win.ekey;
|
||||
if (!taosArrayPush(pClosed, &pos)) {
|
||||
taosMemoryFree(pos);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pSeWin->isClosed = true;
|
||||
if (calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed);
|
||||
pSeWin->isOutput = true;
|
||||
}
|
||||
}
|
||||
|
@ -2681,6 +2671,19 @@ int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pC
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getAllSessionWindow(SArray* pWins, SArray* pClosed, __get_win_info_ fn) {
|
||||
int32_t size = taosArrayGetSize(pWins);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
void* pWin = taosArrayGet(pWins, i);
|
||||
SResultWindowInfo* pSeWin = fn(pWin);
|
||||
if (!pSeWin->isClosed) {
|
||||
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed);
|
||||
pSeWin->isOutput = true;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -2703,6 +2706,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
|
@ -2723,7 +2727,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
taosArrayDestroy(pWins);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL &&
|
||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
|
||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getSessionWinInfo);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (isFinalSession(pInfo)) {
|
||||
int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock
|
||||
SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||
|
@ -2735,15 +2744,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
// restore the value
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
SArray* pClosed = taosArrayInit(16, POINTER_BYTES);
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, pInfo->twAggSup.calTrigger,
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
|
||||
getSessionWinInfo);
|
||||
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
|
||||
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId);
|
||||
taosHashCleanup(pStUpdated);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
taosArrayAddAll(pUpdated, pClosed);
|
||||
}
|
||||
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
|
||||
pInfo->binfo.rowCellInfoOffset);
|
||||
|
@ -3067,6 +3071,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SHashObj* pSeUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
|
@ -3078,6 +3083,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
|
||||
pSeUpdated, pInfo->pSeDeleted);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL &&
|
||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
|
||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getStateWinInfo);
|
||||
continue;
|
||||
}
|
||||
doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
|
@ -3085,15 +3094,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
// restore the value
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
SArray* pClosed = taosArrayInit(16, POINTER_BYTES);
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, pInfo->twAggSup.calTrigger,
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
|
||||
getStateWinInfo);
|
||||
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
|
||||
copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId);
|
||||
taosHashCleanup(pSeUpdated);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
taosArrayAddAll(pUpdated, pClosed);
|
||||
}
|
||||
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
|
||||
pInfo->binfo.rowCellInfoOffset);
|
||||
|
|
|
@ -71,6 +71,7 @@
|
|||
./test.sh -f tsim/stream/basic0.sim
|
||||
./test.sh -f tsim/stream/basic1.sim
|
||||
./test.sh -f tsim/stream/basic2.sim
|
||||
# ./test.sh -f tsim/stream/distributeInterval0.sim
|
||||
# ./test.sh -f tsim/stream/session0.sim
|
||||
# ./test.sh -f tsim/stream/session1.sim
|
||||
# ./test.sh -f tsim/stream/state0.sim
|
||||
|
|
|
@ -0,0 +1,176 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
sql create dnode $hostname2 port 7200
|
||||
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
|
||||
sql create database test vgroups 4;
|
||||
sql use test;
|
||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table ts1 using st tags(1,1,1);
|
||||
sql create table ts2 using st tags(2,2,2);
|
||||
sql create table ts3 using st tags(3,2,2);
|
||||
sql create table ts4 using st tags(4,2,2);
|
||||
sql create stream stream_t1 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into ts1 values(1648791213001,1,12,3,1.0);
|
||||
sql insert into ts2 values(1648791213001,1,12,3,1.0);
|
||||
|
||||
sql insert into ts3 values(1648791213001,1,12,3,1.0);
|
||||
sql insert into ts4 values(1648791213001,1,12,3,1.0);
|
||||
|
||||
sql insert into ts1 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
|
||||
sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
|
||||
sql insert into ts1 values(1648791223002,2,2,3,1.1);
|
||||
sql insert into ts1 values(1648791233003,3,2,3,2.1);
|
||||
sql insert into ts2 values(1648791243004,4,2,43,73.1);
|
||||
sql insert into ts1 values(1648791213002,24,22,23,4.1);
|
||||
sql insert into ts1 values(1648791243005,4,20,3,3.1);
|
||||
sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
|
||||
sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
|
||||
sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
|
||||
sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
|
||||
sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
|
||||
sql insert into ts3 values(1648791223002,2,2,3,1.1);
|
||||
sql insert into ts4 values(1648791233003,3,2,3,2.1);
|
||||
sql insert into ts3 values(1648791243004,4,2,43,73.1);
|
||||
sql insert into ts4 values(1648791213002,24,22,23,4.1);
|
||||
sql insert into ts3 values(1648791243005,4,20,3,3.1);
|
||||
sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
|
||||
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
|
||||
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
|
||||
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
|
||||
sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
sql select * from streamtST1;
|
||||
|
||||
sleep 300
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 0
|
||||
if $data01 != 8 then
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 4 then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data03 != 4 then
|
||||
print ======$data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 52 then
|
||||
print ======$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != 13 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 1
|
||||
if $data11 != 6 then
|
||||
print =====data11=$data11
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data12 != 6 then
|
||||
print =====data12=$data12
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data13 != 92 then
|
||||
print ======$data13
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 22 then
|
||||
print ======$data14
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data15 != 3 then
|
||||
print ======$data15
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 2
|
||||
if $data21 != 4 then
|
||||
print =====data21=$data21
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data22 != 4 then
|
||||
print =====data22=$data22
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data23 != 32 then
|
||||
print ======$data23
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data24 != 12 then
|
||||
print ======$data24
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data25 != 3 then
|
||||
print ======$data25
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 3
|
||||
if $data31 != 30 then
|
||||
print =====data31=$data31
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data32 != 30 then
|
||||
print =====data32=$data32
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data33 != 180 then
|
||||
print ======$data33
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data34 != 42 then
|
||||
print ======$data34
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data35 != 3 then
|
||||
print ======$data35
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue