fix:number of ssdatablock rows exceeds the capacity
This commit is contained in:
parent
fcd1732514
commit
3f38d56c0d
|
@ -832,10 +832,13 @@ static bool checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t group
|
|||
return true;
|
||||
}
|
||||
|
||||
static void buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock) {
|
||||
static bool buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock) {
|
||||
if (pBlock->info.rows >= pBlock->info.capacity) {
|
||||
return false;
|
||||
}
|
||||
uint64_t groupId = pBlock->info.id.groupId;
|
||||
if (pFillSup->hasDelete && !checkResult(pFillSup, ts, groupId)) {
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
|
||||
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
|
||||
|
@ -853,6 +856,7 @@ static void buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFill
|
|||
}
|
||||
}
|
||||
pBlock->info.rows++;
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
|
||||
|
@ -932,7 +936,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
|
|||
}
|
||||
|
||||
if (pFillInfo->pos == FILL_POS_START) {
|
||||
buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes);
|
||||
if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) {
|
||||
pFillInfo->pos = FILL_POS_INVALID;
|
||||
}
|
||||
}
|
||||
if (pFillInfo->type != TSDB_FILL_LINEAR) {
|
||||
doStreamFillNormal(pFillSup, pFillInfo, pRes);
|
||||
|
@ -940,7 +946,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
|
|||
doStreamFillLinear(pFillSup, pFillInfo, pRes);
|
||||
|
||||
if (pFillInfo->pos == FILL_POS_MID) {
|
||||
buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes);
|
||||
if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) {
|
||||
pFillInfo->pos = FILL_POS_INVALID;
|
||||
}
|
||||
}
|
||||
|
||||
if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
|
||||
|
@ -954,7 +962,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
|
|||
}
|
||||
}
|
||||
if (pFillInfo->pos == FILL_POS_END) {
|
||||
buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes);
|
||||
if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) {
|
||||
pFillInfo->pos = FILL_POS_INVALID;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -989,10 +999,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
|
|||
uint64_t groupId = pBlock->info.id.groupId;
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
pRes->info.id.groupId = groupId;
|
||||
if (hasRemainCalc(pFillInfo)) {
|
||||
doStreamFillRange(pFillInfo, pFillSup, pRes);
|
||||
}
|
||||
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
|
||||
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
|
||||
|
||||
|
@ -1204,13 +1210,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
if (hasRemainCalc(pInfo->pFillInfo)) {
|
||||
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
if (hasRemainCalc(pInfo->pFillInfo) || (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true )) {
|
||||
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pRes, "stream fill");
|
||||
return pInfo->pRes;
|
||||
}
|
||||
}
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doDeleteFillFinalize(pOperator);
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pRes, "stream fill");
|
||||
|
|
|
@ -0,0 +1,225 @@
|
|||
$loop_all = 0
|
||||
looptest:
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
||||
sleep 500
|
||||
sql connect
|
||||
|
||||
sql drop database if exists test;
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20));;
|
||||
sql create stream streams1 trigger at_once into streamt as select _wstart ts, count(*) c1 from t1 interval(1s) fill(NULL);
|
||||
sql insert into t1 values(1648791211000,1,2,3,1.0,'aaa');
|
||||
sleep 100
|
||||
sql insert into t1 values(1648795308000,1,2,3,1.0,'aaa');
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop0:
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 500
|
||||
sql select * from streamt where c1 > 0;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql select count(*) from streamt;
|
||||
|
||||
if $data00 != 4098 then
|
||||
print =====data00=$data00
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648800308000,1,1,1,1.0,'aaa');
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop1:
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 500
|
||||
sql select * from streamt where c1 > 0;
|
||||
|
||||
if $rows != 3 then
|
||||
print =====rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select count(*) from streamt;
|
||||
|
||||
if $data00 != 9098 then
|
||||
print =====rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648786211000,1,1,1,1.0,'aaa');
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop2:
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 500
|
||||
sql select * from streamt where c1 > 0;
|
||||
|
||||
if $rows != 4 then
|
||||
print =====rows=$rows
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
sql select count(*) from streamt;
|
||||
|
||||
if $data00 != 14098 then
|
||||
print =====rows=$rows
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648801308000,1,1,1,1.0,'aaa') (1648802308000,1,1,1,1.0,'aaa') (1648803308000,1,1,1,1.0,'aaa') (1648804308000,1,1,1,1.0,'aaa') (1648805308000,1,1,1,1.0,'aaa');
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop21:
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 500
|
||||
sql select * from streamt where c1 > 0;
|
||||
|
||||
if $rows != 9 then
|
||||
print =====rows=$rows
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
sql select count(*) from streamt;
|
||||
|
||||
if $data00 != 19098 then
|
||||
print =====rows=$rows
|
||||
goto loop21
|
||||
endi
|
||||
|
||||
sql drop database if exists test;
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20));
|
||||
print create stream streams1 trigger at_once into streamt as select _wstart ts, max(a) c1 from t1 interval(1s) fill(linear);
|
||||
sql create stream streams1 trigger at_once into streamt as select _wstart ts, max(a) c1 from t1 interval(1s) fill(linear);
|
||||
|
||||
print create stream streams2 trigger at_once into streamt2 as select _wstart ts, max(a) c1 from t1 interval(1s) fill(prev);
|
||||
sql create stream streams2 trigger at_once into streamt2 as select _wstart ts, max(a) c1 from t1 interval(1s) fill(prev);
|
||||
|
||||
sql insert into t1 values(1648791211000,1,2,3,1.0,'aaa');
|
||||
sleep 100
|
||||
sql insert into t1 values(1648795308000,1,2,3,1.0,'aaa');
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop3:
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 500
|
||||
|
||||
print select count(*) from streamt;
|
||||
sql select count(*) from streamt;
|
||||
|
||||
if $data00 != 4098 then
|
||||
print =====data00=$data00
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print select count(*) from streamt2;
|
||||
sql select count(*) from streamt2;
|
||||
|
||||
if $data00 != 4098 then
|
||||
print =====data00=$data00
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648800308000,1,1,1,1.0,'aaa');
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop4:
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 500
|
||||
|
||||
print select count(*) from streamt;
|
||||
sql select count(*) from streamt;
|
||||
|
||||
if $data00 != 9098 then
|
||||
print =====rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
print select count(*) from streamt2;
|
||||
sql select count(*) from streamt2;
|
||||
|
||||
if $data00 != 9098 then
|
||||
print =====rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648786211000,1,1,1,1.0,'aaa');
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop5:
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 500
|
||||
|
||||
print select count(*) from streamt;
|
||||
sql select count(*) from streamt;
|
||||
|
||||
if $data00 != 14098 then
|
||||
print =====rows=$rows
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
print select count(*) from streamt2;
|
||||
sql select count(*) from streamt2;
|
||||
|
||||
if $data00 != 14098 then
|
||||
print =====rows=$rows
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
|
||||
#goto looptest
|
Loading…
Reference in New Issue