diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 869623417d..c02c3bcd4a 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -41,7 +41,6 @@ int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap); void doBuildDeleteResultImpl(SStateStore* pAPI, SStreamState* pState, SArray* pWins, int32_t* index, SSDataBlock* pBlock); -int32_t initResultBuf(SStreamFillSupporter* pFillSup); SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pRes); SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 8df5c004eb..7c7b800b71 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1166,7 +1166,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { return pRes; } -int32_t initResultBuf(SStreamFillSupporter* pFillSup) { +static int32_t initResultBuf(SStreamFillSupporter* pFillSup) { pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols; for (int i = 0; i < pFillSup->numOfAllCols; i++) { SFillColInfo* pCol = &pFillSup->pAllColInfo[i]; diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index ddc414d222..7a17b2d4b5 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -256,7 +256,21 @@ _end: return code; } -static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprInfo* pExprInfo, int32_t numOfExprs, +static int32_t initTimeSliceResultBuf(SStreamFillSupporter* pFillSup, SExprSupp* pExpSup) { + pFillSup->rowSize = sizeof(TSKEY) + getResultRowSize(pExpSup->pCtx, pFillSup->numOfAllCols); + pFillSup->next.key = INT64_MIN; + pFillSup->nextNext.key = INT64_MIN; + pFillSup->prev.key = INT64_MIN; + pFillSup->cur.key = INT64_MIN; + pFillSup->next.pRowVal = NULL; + pFillSup->nextNext.pRowVal = NULL; + pFillSup->prev.pRowVal = NULL; + pFillSup->cur.pRowVal = NULL; + + return TSDB_CODE_SUCCESS; +} + +static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs, SStreamFillSupporter** ppResFillSup) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -265,7 +279,7 @@ static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SE pFillSup->numOfFillCols = numOfExprs; int32_t numOfNotFillCols = 0; - pFillSup->pAllColInfo = createFillColInfo(pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols, + pFillSup->pAllColInfo = createFillColInfo(pExprSup->pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols, (const SNodeListNode*)(pPhyFillNode->pFillValues)); QUERY_CHECK_NULL(pFillSup->pAllColInfo, code, lino, _end, terrno); @@ -283,7 +297,7 @@ static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SE pFillSup->pResMap = tSimpleHashInit(16, hashFn); QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno); - code = initResultBuf(pFillSup); + code = initTimeSliceResultBuf(pFillSup, pExprSup); QUERY_CHECK_CODE(code, lino, _end); pFillSup->hasDelete = false; @@ -572,7 +586,7 @@ static void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) { pPoint->pLeftRow = pPoint->pRightRow; } else { setResultRowData(&pPoint->pLeftRow, pPoint->pResPos->pRowBuff); - void* pBuff = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize + sizeof(TSKEY)); + void* pBuff = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize); setResultRowData(&pPoint->pRightRow, pBuff); } } @@ -1609,7 +1623,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->primaryTsIndex = ((SColumnNode*)pInterpPhyNode->pTimeSeries)->slotId; pInfo->pFillSup = NULL; - code = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs, &pInfo->pFillSup); + code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, &pInfo->pFillSup); QUERY_CHECK_CODE(code, lino, _error); int32_t ratio = 1; @@ -1659,6 +1673,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pDownRes); copyFillValueInfo(pInfo->pFillSup, pInfo->pFillInfo); + pInfo->ignoreNull = getIgoreNullRes(pExpSup); if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; diff --git a/tests/script/tsim/stream/streamInterpOther2.sim b/tests/script/tsim/stream/streamInterpOther2.sim new file mode 100644 index 0000000000..25d5171a5c --- /dev/null +++ b/tests/script/tsim/stream/streamInterpOther2.sim @@ -0,0 +1,525 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step prev + +sql create database test1 vgroups 4; +sql use test1; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname every(1s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791217000,0,0,3) (1648791212000,10,10,3) (1648791212001,11,NULL,3); + +sleep 500 + +sql insert into t1 values(1648791214001,20,NULL,3) (1648791213000,22,NULL,3); + +print sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev); +sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev); + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +$loop_count = 0 +loop0: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 + +print sql select * from streamt1; +sql select * from streamt1; + +if $rows != 6 then + print ======rows=$rows + goto loop0 +endi + +if $data01 != 0 then + print ======data01=$data01 + goto loop0 +endi + +if $data02 != 10 then + print ======data02=$data02 + goto loop0 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop0 +endi + +if $data12 != 10 then + print ======data12=$data12 + goto loop0 +endi + +if $data21 != 1 then + print ======data21=$data21 + goto loop0 +endi + +if $data22 != 10 then + print ======data22=$data22 + goto loop0 +endi + +if $data31 != 1 then + print ======data31=$data31 + goto loop0 +endi + +if $data32 != 10 then + print ======data32=$data32 + goto loop0 +endi + +if $data41 != 1 then + print ======data41=$data41 + goto loop0 +endi + +if $data42 != 10 then + print ======data42=$data42 + goto loop0 +endi + +if $data51 != 0 then + print ======data51=$data51 + goto loop0 +endi + +if $data52 != 0 then + print ======data52=$data52 + goto loop0 +endi + +print step next + +sql create database test2 vgroups 4; +sql use test2; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname every(1s) fill(next); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791217000,11,11,3) (1648791212000,10,10,3) (1648791212001,11,NULL,3); + +sleep 500 + +sql insert into t1 values(1648791214001,20,NULL,3) (1648791213000,22,NULL,3); + +print sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(next); +sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(next); + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +$loop_count = 0 +loop1: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 + +print sql select * from streamt2; +sql select * from streamt2; + +if $rows != 6 then + print ======rows=$rows + goto loop1 +endi + +if $data01 != 0 then + print ======data01=$data01 + goto loop1 +endi + +if $data02 != 10 then + print ======data02=$data02 + goto loop1 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop1 +endi + +if $data12 != 11 then + print ======data12=$data12 + goto loop1 +endi + +if $data21 != 1 then + print ======data21=$data21 + goto loop1 +endi + +if $data22 != 11 then + print ======data22=$data22 + goto loop1 +endi + +if $data31 != 1 then + print ======data31=$data31 + goto loop1 +endi + +if $data32 != 11 then + print ======data32=$data32 + goto loop1 +endi + +if $data41 != 1 then + print ======data41=$data41 + goto loop1 +endi + +if $data42 != 11 then + print ======data42=$data42 + goto loop1 +endi + +if $data51 != 0 then + print ======data51=$data51 + goto loop1 +endi + +if $data52 != 11 then + print ======data52=$data52 + goto loop1 +endi + +print step value + +sql create database test3 vgroups 4; +sql use test3; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams3_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_1 as select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname every(1s) fill(NULL); +sql create stream streams3_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_2 as select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname every(1s) fill(value, 110); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791217000,11,11,3) (1648791212000,10,10,3) (1648791212001,11,NULL,3); + +sleep 500 + +sql insert into t1 values(1648791214001,20,NULL,3) (1648791213000,22,NULL,3); + +print sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(NULL); +sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +$loop_count = 0 +loop3: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 + +print sql select * from streamt3_1; +sql select * from streamt3_1; + +if $rows != 6 then + print ======rows=$rows + goto loop3 +endi + +if $data01 != 0 then + print ======data01=$data01 + goto loop3 +endi + +if $data02 != 10 then + print ======data02=$data02 + goto loop3 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop3 +endi + +if $data12 != NULL then + print ======data12=$data12 + goto loop3 +endi + +if $data21 != 1 then + print ======data21=$data21 + goto loop3 +endi + +if $data22 != NULL then + print ======data22=$data22 + goto loop3 +endi + +if $data31 != 1 then + print ======data31=$data31 + goto loop3 +endi + +if $data32 != NULL then + print ======data32=$data32 + goto loop3 +endi + +if $data41 != 1 then + print ======data41=$data41 + goto loop3 +endi + +if $data42 != NULL then + print ======data42=$data42 + goto loop3 +endi + +if $data51 != 0 then + print ======data51=$data51 + goto loop3 +endi + +if $data52 != 11 then + print ======data52=$data52 + goto loop3 +endi + + +print sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(value, 110); +sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(value, 110); + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +$loop_count = 0 +loop3_2: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 + +print sql select * from streamt3_2; +sql select * from streamt3_2; + +if $rows != 6 then + print ======rows=$rows + goto loop3_2 +endi + +if $data01 != 0 then + print ======data01=$data01 + goto loop3_2 +endi + +if $data02 != 10 then + print ======data02=$data02 + goto loop3_2 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop3_2 +endi + +if $data12 != 110 then + print ======data12=$data12 + goto loop3_2 +endi + +if $data21 != 1 then + print ======data21=$data21 + goto loop3_2 +endi + +if $data22 != 110 then + print ======data22=$data22 + goto loop3_2 +endi + +if $data31 != 1 then + print ======data31=$data31 + goto loop3_2 +endi + +if $data32 != 110 then + print ======data32=$data32 + goto loop3_2 +endi + +if $data41 != 1 then + print ======data41=$data41 + goto loop3_2 +endi + +if $data42 != 110 then + print ======data42=$data42 + goto loop3_2 +endi + +if $data51 != 0 then + print ======data51=$data51 + goto loop3_2 +endi + +if $data52 != 11 then + print ======data52=$data52 + goto loop3_2 +endi + +print step linear + +sql create database test4 vgroups 4; +sql use test4; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname every(1s) fill(linear); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791217000,11,55,3) (1648791212000,10,10,3) (1648791212001,11,NULL,3); + +sleep 500 + +sql insert into t1 values(1648791214001,20,NULL,3) (1648791213000,22,NULL,3); + +print sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(linear); +sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(linear); + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +$loop_count = 0 +loop4: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 + +print sql select * from streamt4; +sql select * from streamt4; + +if $rows != 6 then + print ======rows=$rows + goto loop4 +endi + +if $data01 != 0 then + print ======data01=$data01 + goto loop4 +endi + +if $data02 != 10 then + print ======data02=$data02 + goto loop4 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop4 +endi + +if $data12 != 19 then + print ======data12=$data12 + goto loop4 +endi + +if $data21 != 1 then + print ======data21=$data21 + goto loop4 +endi + +if $data22 != 28 then + print ======data22=$data22 + goto loop4 +endi + +if $data31 != 1 then + print ======data31=$data31 + goto loop4 +endi + +if $data32 != 37 then + print ======data32=$data32 + goto loop4 +endi + +if $data41 != 1 then + print ======data41=$data41 + goto loop4 +endi + +if $data42 != 46 then + print ======data42=$data42 + goto loop4 +endi + +if $data51 != 0 then + print ======data51=$data51 + goto loop4 +endi + +if $data52 != 55 then + print ======data52=$data52 + goto loop4 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT