This commit is contained in:
54liuyao 2024-08-08 09:40:13 +08:00
parent 77c0324e20
commit 3b9eda95fc
2 changed files with 570 additions and 10 deletions

View File

@ -340,7 +340,7 @@ static SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t ind
}
static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, TSKEY ts, SSDataBlock* pBlock,
bool* pRes) {
bool* pRes, bool isFilled) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pBlock->info.rows >= pBlock->info.capacity) {
@ -368,7 +368,6 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p
code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&ts, false);
QUERY_CHECK_CODE(code, lino, _end);
} else if (isIsfilledPseudoColumn(pFillCol->pExpr)) {
bool isFilled = false;
code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
@ -396,7 +395,7 @@ static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
if (inWinRange(&pFillSup->winRange, &st)) {
bool res = true;
code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock, &res);
code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock, &res, true);
QUERY_CHECK_CODE(code, lino, _end);
}
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
@ -478,13 +477,13 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
int32_t lino = 0;
bool res = true;
if (pFillInfo->needFill == false) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res);
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end);
return;
}
if (pFillInfo->pos == FILL_POS_START) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res);
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end);
if (res) {
pFillInfo->pos = FILL_POS_INVALID;
@ -496,7 +495,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
fillLinearRange(pFillSup, pFillInfo, pRes);
if (pFillInfo->pos == FILL_POS_MID) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res);
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end);
if (res) {
pFillInfo->pos = FILL_POS_INVALID;
@ -512,7 +511,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
}
}
if (pFillInfo->pos == FILL_POS_END) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res);
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end);
if (res) {
pFillInfo->pos = FILL_POS_INVALID;
@ -820,7 +819,7 @@ static int32_t getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSu
int32_t nextVLen = 0;
pNextPoint->key.groupId = groupId;
if (pFillSup->type != TSDB_FILL_LINEAR) {
if (pFillSup->type != TSDB_FILL_LINEAR && pFillSup->type != TSDB_FILL_PREV) {
SET_WIN_KEY_INVALID(pNextPoint->key.ts);
code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key,
(void**)&pNextPoint->pResPos, &nextVLen, &tmpRes);
@ -1180,7 +1179,6 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pKey->groupId;
} else if (pBlock->info.id.groupId != pKey->groupId) {
pGroupResInfo->index--;
break;
}
SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId};
@ -1527,6 +1525,8 @@ int32_t getDownstreamRes(SOperatorInfo* downstream, SSDataBlock** ppRes) {
SStreamScanInfo* pInfo = (SStreamScanInfo*)downstream->info;
*ppRes = pInfo->pRes;
return TSDB_CODE_SUCCESS;
} else if (downstream->pDownstream[0] != NULL) {
return getDownstreamRes(downstream->pDownstream[0], ppRes);
}
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_FAILED));
return TSDB_CODE_FAILED;
@ -1664,10 +1664,16 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
return code;
_error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pInfo != NULL) {
destroyStreamTimeSliceOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
(*ppOptInfo) = NULL;
return code;

View File

@ -0,0 +1,554 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step prev
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(2,2,2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c every(1s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,1,0,0,1.0) (1648791217001,2,0,0,2.1) t2 values(1648791212000,0,1,1,0.0) (1648791212001,1,1,1,1.0) (1648791217001,2,1,1,2.1);
sql insert into t3 values(1648791212000,0,2,2,0.0) (1648791212001,1,2,2,1.0) (1648791217001,2,2,2,2.1);
print sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1;
#sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt where b = 0 and c = 0 order by 1;
sql select * from streamt where b = 0 and c = 0 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop0
endi
if $data02 != 0 then
print ======data02=$data02
goto loop0
endi
if $data11 != 1 then
print ======data11=$data11
goto loop0
endi
if $data12 != 1 then
print ======data12=$data12
goto loop0
endi
if $data21 != 1 then
print ======data21=$data21
goto loop0
endi
if $data22 != 1 then
print ======data22=$data22
goto loop0
endi
if $data31 != 1 then
print ======data31=$data31
goto loop0
endi
if $data32 != 1 then
print ======data32=$data32
goto loop0
endi
if $data41 != 1 then
print ======data41=$data41
goto loop0
endi
if $data42 != 1 then
print ======data41=$data41
goto loop0
endi
if $data51 != 1 then
print ======data51=$data51
goto loop0
endi
if $data52 != 1 then
print ======data51=$data51
goto loop0
endi
print 1 sql select * from streamt where b = 1 and c = 1 order by 1;
sql select * from streamt where b = 1 and c = 1 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop0
endi
if $data02 != 0 then
print ======data02=$data02
goto loop0
endi
if $data11 != 1 then
print ======data11=$data11
goto loop0
endi
if $data12 != 1 then
print ======data12=$data12
goto loop0
endi
if $data21 != 1 then
print ======data21=$data21
goto loop0
endi
if $data22 != 1 then
print ======data22=$data22
goto loop0
endi
if $data31 != 1 then
print ======data31=$data31
goto loop0
endi
if $data32 != 1 then
print ======data32=$data32
goto loop0
endi
if $data41 != 1 then
print ======data41=$data41
goto loop0
endi
if $data42 != 1 then
print ======data41=$data41
goto loop0
endi
if $data51 != 1 then
print ======data51=$data51
goto loop0
endi
if $data52 != 1 then
print ======data51=$data51
goto loop0
endi
print 2 sql select * from streamt where b = 2 and c = 2 order by 1;
sql select * from streamt where b = 2 and c = 2 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop0
endi
if $data02 != 0 then
print ======data02=$data02
goto loop0
endi
if $data11 != 1 then
print ======data11=$data11
goto loop0
endi
if $data12 != 1 then
print ======data12=$data12
goto loop0
endi
if $data21 != 1 then
print ======data21=$data21
goto loop0
endi
if $data22 != 1 then
print ======data22=$data22
goto loop0
endi
if $data31 != 1 then
print ======data31=$data31
goto loop0
endi
if $data32 != 1 then
print ======data32=$data32
goto loop0
endi
if $data41 != 1 then
print ======data41=$data41
goto loop0
endi
if $data42 != 1 then
print ======data41=$data41
goto loop0
endi
if $data51 != 1 then
print ======data51=$data51
goto loop0
endi
if $data52 != 1 then
print ======data51=$data51
goto loop0
endi
print step next
print =============== create database
sql create database test2 vgroups 1;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(2,2,2);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c every(1s) fill(next);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,1,0,0,1.0) (1648791217001,2,0,0,2.1);
sql insert into t2 values(1648791212000,0,1,1,0.0) (1648791212001,1,1,1,1.0) (1648791217001,2,1,1,2.1);
sql insert into t3 values(1648791212000,0,2,2,0.0) (1648791212001,1,2,2,1.0) (1648791217001,2,2,2,2.1);
print sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1;
#sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt2 where b = 0 and c = 0 order by 1;
sql select * from streamt2 where b = 0 and c = 0 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop1
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop1
endi
if $data02 != 0 then
print ======data02=$data02
goto loop1
endi
if $data11 != 2 then
print ======data11=$data11
goto loop1
endi
if $data12 != 1 then
print ======data12=$data12
goto loop1
endi
if $data21 != 2 then
print ======data21=$data21
goto loop1
endi
if $data22 != 1 then
print ======data22=$data22
goto loop1
endi
if $data31 != 2 then
print ======data31=$data31
goto loop1
endi
if $data32 != 1 then
print ======data32=$data32
goto loop1
endi
if $data41 != 2 then
print ======data41=$data41
goto loop1
endi
if $data42 != 1 then
print ======data41=$data41
goto loop1
endi
if $data51 != 2 then
print ======data51=$data51
goto loop1
endi
if $data52 != 1 then
print ======data51=$data51
goto loop1
endi
print 1 sql select * from streamt2 where b = 1 and c = 1 order by 1;
sql select * from streamt2 where b = 1 and c = 1 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop1
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop1
endi
if $data02 != 0 then
print ======data02=$data02
goto loop1
endi
if $data11 != 2 then
print ======data11=$data11
goto loop1
endi
if $data12 != 1 then
print ======data12=$data12
goto loop1
endi
if $data21 != 2 then
print ======data21=$data21
goto loop1
endi
if $data22 != 1 then
print ======data22=$data22
goto loop1
endi
if $data31 != 2 then
print ======data31=$data31
goto loop1
endi
if $data32 != 1 then
print ======data32=$data32
goto loop1
endi
if $data41 != 2 then
print ======data41=$data41
goto loop1
endi
if $data42 != 1 then
print ======data41=$data41
goto loop1
endi
if $data51 != 2 then
print ======data51=$data51
goto loop1
endi
if $data52 != 1 then
print ======data51=$data51
goto loop1
endi
print 2 sql select * from streamt2 where b = 2 and c = 2 order by 1;
sql select * from streamt2 where b = 2 and c = 2 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop1
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop1
endi
if $data02 != 0 then
print ======data02=$data02
goto loop1
endi
if $data11 != 2 then
print ======data11=$data11
goto loop1
endi
if $data12 != 1 then
print ======data12=$data12
goto loop1
endi
if $data21 != 2 then
print ======data21=$data21
goto loop1
endi
if $data22 != 1 then
print ======data22=$data22
goto loop1
endi
if $data31 != 2 then
print ======data31=$data31
goto loop1
endi
if $data32 != 1 then
print ======data32=$data32
goto loop1
endi
if $data41 != 2 then
print ======data41=$data41
goto loop1
endi
if $data42 != 1 then
print ======data41=$data41
goto loop1
endi
if $data51 != 2 then
print ======data51=$data51
goto loop1
endi
if $data52 != 1 then
print ======data51=$data51
goto loop1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT