Merge pull request #19949 from taosdata/fix/TS-2538
fix:crash at generate session scan range
This commit is contained in:
commit
a4c66dd6bb
|
@ -1175,6 +1175,20 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
|
||||
SSessionKey* pKey) {
|
||||
pKey->win.skey = startTs;
|
||||
pKey->win.ekey = endTs;
|
||||
pKey->groupId = groupId;
|
||||
|
||||
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
|
||||
int32_t code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SET_SESSION_WIN_KEY_INVALID(pKey);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
||||
blockDataCleanup(pDestBlock);
|
||||
if (pSrcBlock->info.rows == 0) {
|
||||
|
@ -1210,7 +1224,14 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
|||
}
|
||||
SSessionKey endWin = {0};
|
||||
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
|
||||
ASSERT(!IS_INVALID_SESSION_WIN_KEY(endWin));
|
||||
if(IS_INVALID_SESSION_WIN_KEY(endWin)) {
|
||||
getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
|
||||
}
|
||||
if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
|
||||
// window has been closed.
|
||||
qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64 , startData[i], endData[i]);
|
||||
continue;
|
||||
}
|
||||
colDataAppend(pDestStartCol, i, (const char*)&startWin.win.skey, false);
|
||||
colDataAppend(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
|
||||
|
||||
|
|
|
@ -524,6 +524,112 @@ if $data42 != 14 then
|
|||
goto loop20
|
||||
endi
|
||||
|
||||
sql drop database if exists test4;
|
||||
sql drop stream if exists streams4;
|
||||
sql create database test4 vgroups 1;
|
||||
sql use test4;
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
print create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from st partition by tbname session(ts, 2s);
|
||||
sql create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from st partition by tbname session(ts, 2s);
|
||||
|
||||
sql insert into t1 values(1648791210000,1,2,3);
|
||||
sql insert into t1 values(1648791220000,2,2,3);
|
||||
sql insert into t1 values(1648791221000,2,2,3);
|
||||
sql insert into t1 values(1648791222000,2,2,3);
|
||||
sql insert into t1 values(1648791223000,2,2,3);
|
||||
sql insert into t1 values(1648791231000,2,2,3);
|
||||
|
||||
sql insert into t2 values(1648791210000,1,2,3);
|
||||
sql insert into t2 values(1648791220000,2,2,3);
|
||||
sql insert into t2 values(1648791221000,2,2,3);
|
||||
sql insert into t2 values(1648791231000,2,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop21:
|
||||
sleep 200
|
||||
sql select * from streamt4 order by c1 desc;;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 6 then
|
||||
print =====rows=$rows
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data01 != 4 then
|
||||
print =====data01=$data01
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print =====data11=$data11
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data21 != 1 then
|
||||
print =====data21=$data21
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data31 != 1 then
|
||||
print =====data31=$data31
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data41 != 1 then
|
||||
print =====data41=$data41
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data51 != 1 then
|
||||
print =====data51=$data51
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
print delete from st where ts >= 1648791220000 and ts <=1648791223000;
|
||||
sql delete from st where ts >= 1648791220000 and ts <=1648791223000;
|
||||
|
||||
loop22:
|
||||
sleep 200
|
||||
sql select * from streamt4 order by c1 desc;;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 4 then
|
||||
print =====rows=$rows
|
||||
goto loop22
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop22
|
||||
endi
|
||||
|
||||
if $data11 != 1 then
|
||||
print =====data11=$data11
|
||||
goto loop22
|
||||
endi
|
||||
|
||||
if $data21 != 1 then
|
||||
print =====data21=$data21
|
||||
goto loop22
|
||||
endi
|
||||
|
||||
if $data31 != 1 then
|
||||
print =====data31=$data31
|
||||
goto loop22
|
||||
endi
|
||||
|
||||
$loop_all = $loop_all + 1
|
||||
print ============loop_all=$loop_all
|
||||
|
||||
|
|
|
@ -189,6 +189,112 @@ if $data12 != 4 then
|
|||
goto loop6
|
||||
endi
|
||||
|
||||
sql drop database if exists test4;
|
||||
sql drop stream if exists streams4;
|
||||
sql create database test4 vgroups 1;
|
||||
sql use test4;
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
print create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from st partition by tbname state_window(c);
|
||||
sql create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from st partition by tbname state_window(c);
|
||||
|
||||
sql insert into t1 values(1648791210000,1,2,1);
|
||||
sql insert into t1 values(1648791220000,2,2,2);
|
||||
sql insert into t1 values(1648791221000,2,2,2);
|
||||
sql insert into t1 values(1648791222000,2,2,2);
|
||||
sql insert into t1 values(1648791223000,2,2,2);
|
||||
sql insert into t1 values(1648791231000,2,2,3);
|
||||
|
||||
sql insert into t2 values(1648791210000,1,2,1);
|
||||
sql insert into t2 values(1648791220000,2,2,2);
|
||||
sql insert into t2 values(1648791221000,2,2,2);
|
||||
sql insert into t2 values(1648791231000,2,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop21:
|
||||
sleep 200
|
||||
sql select * from streamt4 order by c1 desc;;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 6 then
|
||||
print =====rows=$rows
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data01 != 4 then
|
||||
print =====data01=$data01
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print =====data11=$data11
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data21 != 1 then
|
||||
print =====data21=$data21
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data31 != 1 then
|
||||
print =====data31=$data31
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data41 != 1 then
|
||||
print =====data41=$data41
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
if $data51 != 1 then
|
||||
print =====data51=$data51
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
print delete from st where ts >= 1648791220000 and ts <=1648791223000;
|
||||
sql delete from st where ts >= 1648791220000 and ts <=1648791223000;
|
||||
|
||||
loop22:
|
||||
sleep 200
|
||||
sql select * from streamt4 order by c1 desc;;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 4 then
|
||||
print =====rows=$rows
|
||||
goto loop22
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop22
|
||||
endi
|
||||
|
||||
if $data11 != 1 then
|
||||
print =====data11=$data11
|
||||
goto loop22
|
||||
endi
|
||||
|
||||
if $data21 != 1 then
|
||||
print =====data21=$data21
|
||||
goto loop22
|
||||
endi
|
||||
|
||||
if $data31 != 1 then
|
||||
print =====data31=$data31
|
||||
goto loop22
|
||||
endi
|
||||
|
||||
|
||||
$loop_all = $loop_all + 1
|
||||
print ============loop_all=$loop_all
|
||||
|
|
Loading…
Reference in New Issue