Merge pull request #26494 from taosdata/fix/TD-30924
set key for session window state
This commit is contained in:
commit
d77fec10fc
|
@ -3121,6 +3121,7 @@ _error:
|
||||||
static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
|
static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
|
||||||
tSimpleHashClear(pInfo->streamAggSup.pResultRows);
|
tSimpleHashClear(pInfo->streamAggSup.pResultRows);
|
||||||
pInfo->streamAggSup.stateStore.streamStateSessionClear(pInfo->streamAggSup.pState);
|
pInfo->streamAggSup.stateStore.streamStateSessionClear(pInfo->streamAggSup.pState);
|
||||||
|
pInfo->clearState = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
|
void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
|
||||||
|
@ -3170,7 +3171,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
// semi session operator clear disk buffer
|
// semi session operator clear disk buffer
|
||||||
clearStreamSessionOperator(pInfo);
|
clearStreamSessionOperator(pInfo);
|
||||||
setStreamOperatorCompleted(pOperator);
|
setStreamOperatorCompleted(pOperator);
|
||||||
pInfo->clearState = false;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -242,7 +242,6 @@ _end:
|
||||||
|
|
||||||
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
||||||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
|
||||||
pNewPos->needFree = true;
|
pNewPos->needFree = true;
|
||||||
pNewPos->beFlushed = true;
|
pNewPos->beFlushed = true;
|
||||||
void* pBuff = NULL;
|
void* pBuff = NULL;
|
||||||
|
@ -250,6 +249,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||||
memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
|
memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
|
||||||
taosMemoryFreeClear(pBuff);
|
taosMemoryFreeClear(pBuff);
|
||||||
(*pVal) = pNewPos;
|
(*pVal) = pNewPos;
|
||||||
|
|
|
@ -21,8 +21,37 @@ sql create table ts3 using st tags(3,2,2);
|
||||||
sql create table ts4 using st tags(4,2,2);
|
sql create table ts4 using st tags(4,2,2);
|
||||||
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4 from st interval(10s);
|
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4 from st interval(10s);
|
||||||
|
|
||||||
|
print ====check task status start
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loopCheck:
|
||||||
|
|
||||||
sleep 1000
|
sleep 1000
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 30 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from information_schema.ins_stream_tasks;
|
||||||
|
sql select * from information_schema.ins_stream_tasks;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
print rows=$rows
|
||||||
|
goto loopCheck
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from information_schema.ins_stream_tasks where status != "ready";
|
||||||
|
sql select * from information_schema.ins_stream_tasks where status != "ready";
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print rows=$rows
|
||||||
|
goto loopCheck
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ====check task status end
|
||||||
|
|
||||||
sql insert into ts1 values(1648791213000,1,1,3,4.1);
|
sql insert into ts1 values(1648791213000,1,1,3,4.1);
|
||||||
sql insert into ts1 values(1648791223000,2,2,3,1.1);
|
sql insert into ts1 values(1648791223000,2,2,3,1.1);
|
||||||
sql insert into ts1 values(1648791233000,3,3,3,2.1);
|
sql insert into ts1 values(1648791233000,3,3,3,2.1);
|
||||||
|
@ -123,6 +152,8 @@ if $data31 != 2 then
|
||||||
goto loop1
|
goto loop1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sleep 5000
|
||||||
|
|
||||||
sql delete from ts2 where ts = 1648791243000 ;
|
sql delete from ts2 where ts = 1648791243000 ;
|
||||||
|
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
|
|
@ -41,8 +41,37 @@ sql create table ts1 using st tags(1,1,1);
|
||||||
sql create table ts2 using st tags(2,2,2);
|
sql create table ts2 using st tags(2,2,2);
|
||||||
sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST as select _wstart, count(*) c1, sum(a) c2 , max(b) c3 from st session(ts, 10s) ;
|
sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST as select _wstart, count(*) c1, sum(a) c2 , max(b) c3 from st session(ts, 10s) ;
|
||||||
|
|
||||||
|
print ====check task status start
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loopCheck:
|
||||||
|
|
||||||
sleep 1000
|
sleep 1000
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 30 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from information_schema.ins_stream_tasks;
|
||||||
|
sql select * from information_schema.ins_stream_tasks;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
print rows=$rows
|
||||||
|
goto loopCheck
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from information_schema.ins_stream_tasks where status != "ready";
|
||||||
|
sql select * from information_schema.ins_stream_tasks where status != "ready";
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print rows=$rows
|
||||||
|
goto loopCheck
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ====check task status end
|
||||||
|
|
||||||
sql insert into ts1 values(1648791211000,1,1,1) (1648791211005,1,1,1);
|
sql insert into ts1 values(1648791211000,1,1,1) (1648791211005,1,1,1);
|
||||||
sql insert into ts2 values(1648791221004,1,2,3) (1648791221008,2,2,3);
|
sql insert into ts2 values(1648791221004,1,2,3) (1648791221008,2,2,3);
|
||||||
sql insert into ts1 values(1648791211005,1,1,1);
|
sql insert into ts1 values(1648791211005,1,1,1);
|
||||||
|
@ -79,4 +108,73 @@ if $data03 != 7 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print ===== step3
|
||||||
|
|
||||||
|
sql create database test1 vgroups 4;
|
||||||
|
sql use test1;
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table ts1 using st tags(1,1,1);
|
||||||
|
sql create table ts2 using st tags(2,2,2);
|
||||||
|
sql create stream stream_t2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST2 as select _wstart, count(*) c1, sum(a) c2 , max(b) c3 from st partition by a session(ts, 10s) ;
|
||||||
|
|
||||||
|
print ====check task status start
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loopCheck1:
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 30 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from information_schema.ins_stream_tasks;
|
||||||
|
sql select * from information_schema.ins_stream_tasks;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
print rows=$rows
|
||||||
|
goto loopCheck1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from information_schema.ins_stream_tasks where status != "ready";
|
||||||
|
sql select * from information_schema.ins_stream_tasks where status != "ready";
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print rows=$rows
|
||||||
|
goto loopCheck1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ====check task status end
|
||||||
|
|
||||||
|
sql insert into ts1 values(1648791201000,1,1,1) (1648791210000,1,1,1);
|
||||||
|
sql insert into ts1 values(1648791211000,2,1,1) (1648791212000,2,1,1);
|
||||||
|
sql insert into ts2 values(1648791211000,3,1,1) (1648791212000,3,1,1);
|
||||||
|
|
||||||
|
sql delete from st where ts = 1648791211000;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop2:
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
print 2 select * from streamtST2;
|
||||||
|
sql select * from streamtST2;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
print $data20 $data21 $data22 $data23
|
||||||
|
print $data30 $data31 $data32 $data33
|
||||||
|
print $data40 $data41 $data42 $data43
|
||||||
|
|
||||||
|
if $rows != 3 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/stop_dnodes.sh
|
system sh/stop_dnodes.sh
|
||||||
|
|
Loading…
Reference in New Issue