diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 9b0e204f73..869623417d 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -46,7 +46,6 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index); void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup); -void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol); bool hasCurWindow(SStreamFillSupporter* pFillSup); bool hasPrevWindow(SStreamFillSupporter* pFillSup); bool hasNextWindow(SStreamFillSupporter* pFillSup); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index b602610b98..4cfacb03b4 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -259,7 +259,7 @@ static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKE pRowVal->key = ts; } -void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) { +static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) { for (int32_t i = 0; i < numOfCol; i++) { if (!pFillCol[i].notFillCol) { int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i); @@ -1271,30 +1271,26 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) { SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i); - SPoint value = {0}; - value.val = taosMemoryCalloc(1, pColData->info.bytes); - if (!value.val) { - code = TSDB_CODE_OUT_OF_MEMORY; - QUERY_CHECK_CODE(code, lino, _end); + if (pColData == NULL) { + SPoint dummy = {0}; + dummy.val = taosMemoryCalloc(1, 1); + taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &dummy); + dummy.val = taosMemoryCalloc(1, 1); + taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &dummy); + continue; } + SPoint value = {0}; + value.val = taosMemoryCalloc(1, pColData->info.bytes); + QUERY_CHECK_NULL(value.val, code, lino, _end, terrno); void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value); - if (!tmpRes) { - code = TSDB_CODE_OUT_OF_MEMORY; - QUERY_CHECK_CODE(code, lino, _end); - } + QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno); value.val = taosMemoryCalloc(1, pColData->info.bytes); - if (!value.val) { - code = TSDB_CODE_OUT_OF_MEMORY; - QUERY_CHECK_CODE(code, lino, _end); - } + QUERY_CHECK_NULL(value.val, code, lino, _end, terrno); tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value); - if (!tmpRes) { - code = TSDB_CODE_OUT_OF_MEMORY; - QUERY_CHECK_CODE(code, lino, _end); - } + QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno); } } pFillInfo->pLinearInfo->winIndex = 0; @@ -1318,6 +1314,11 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i); SResultCellData* pCell = getResultCell(pFillInfo->pResRow, i); + if (pColData == NULL) { + pCell->bytes = 1; + pCell->type = 4; + continue; + } pCell->bytes = pColData->info.bytes; pCell->type = pColData->info.type; } diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 94a856d4d5..ddc414d222 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -347,10 +347,7 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p (*pRes) = false; goto _end; } - if (IS_INVALID_WIN_KEY(ts)) { - (*pRes) = true; - goto _end; - } + bool ckRes = true; code = checkResult(pFillSup, ts, pBlock->info.id.groupId, &ckRes); QUERY_CHECK_CODE(code, lino, _end); @@ -428,7 +425,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi bool isFilled = true; code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false); QUERY_CHECK_CODE(code, lino, _end); - } else { + } else if (isInterpFunc(pFillCol->pExpr)) { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) { @@ -451,6 +448,11 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi QUERY_CHECK_CODE(code, lino, _end); destroySPoint(&cur); + } else { + int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; + SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); + code = setRowCell(pDstCol, pBlock->info.rows, pCell); + QUERY_CHECK_CODE(code, lino, _end); } } pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, @@ -476,7 +478,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; bool res = true; - if (pFillInfo->needFill == false) { + if (pFillInfo->needFill == false && pFillInfo->pos != FILL_POS_INVALID) { code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); QUERY_CHECK_CODE(code, lino, _end); return; @@ -847,6 +849,34 @@ _end: return code; } +static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) { + for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { + SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; + if (!isInterpFunc(pFillCol->pExpr) && !isIrowtsPseudoColumn(pFillCol->pExpr) && + !isIsfilledPseudoColumn(pFillCol->pExpr)) { + int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; + SResultCellData* pSrcCell = getResultCell(&pFillSup->cur, srcSlot); + SResultCellData* pDestCell = getResultCell(pFillInfo->pResRow, srcSlot); + pDestCell->isNull = pSrcCell->isNull; + if (!pDestCell->isNull) { + memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes); + } + } + } +} + +static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) { + for (int32_t i = 0; i < numOfCol; i++) { + if (isInterpFunc(pFillCol[i].pExpr)) { + int32_t slotId = pFillCol[i].pExpr->base.pParam[0].pCol->slotId; + SResultCellData* pECell = getResultCell(pEndRow, slotId); + SPoint* pPoint = taosArrayGet(pEndPoins, slotId); + pPoint->key = pEndRow->key; + memcpy(pPoint->val, pECell->pData, pECell->bytes); + } + } +} + static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { if (!hasPrevWindow(pFillSup) && !hasNextWindow(pFillSup)) { pFillInfo->needFill = false; @@ -878,7 +908,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; } - copyNotFillExpData(pFillSup, pFillInfo); + copyNonFillValueInfo(pFillSup, pFillInfo); pFillInfo->pResRow->key = ts; } break; case TSDB_FILL_PREV: { @@ -920,7 +950,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pos = FILL_POS_INVALID; SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); pFillSup->next.key = pFillSup->nextOriginKey; - calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, pFillSup->numOfAllCols); pFillSup->prev.key = pFillSup->prevOriginKey; pFillInfo->pResRow = &pFillSup->prev; @@ -930,7 +960,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); - calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + copyCalcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, pFillSup->numOfAllCols); pFillSup->prev.key = pFillSup->prevOriginKey; pFillInfo->pResRow = &pFillSup->prev; @@ -942,7 +972,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pos = FILL_POS_START; SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); pFillSup->next.key = pFillSup->nextOriginKey; - calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, pFillSup->numOfAllCols); pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pLinearInfo->hasNext = false; @@ -955,7 +985,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo _end: if (ts != pFillSup->cur.key) { - SET_WIN_KEY_INVALID(pFillSup->cur.key); pFillInfo->pos = FILL_POS_INVALID; } } @@ -1481,7 +1510,7 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) { return pRes; } -static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) { +static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) { if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F) { int32_t valueIndex = 0; for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { @@ -1525,8 +1554,10 @@ int32_t getDownstreamRes(SOperatorInfo* downstream, SSDataBlock** ppRes) { SStreamScanInfo* pInfo = (SStreamScanInfo*)downstream->info; *ppRes = pInfo->pRes; return TSDB_CODE_SUCCESS; - } else if (downstream->pDownstream[0] != NULL) { - return getDownstreamRes(downstream->pDownstream[0], ppRes); + } else if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { + SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)downstream->info; + *ppRes = pInfo->binfo.pRes; + return TSDB_CODE_SUCCESS; } qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_FAILED)); return TSDB_CODE_FAILED; @@ -1627,7 +1658,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pDownRes); - setValueForFillInfo(pInfo->pFillSup, pInfo->pFillInfo); + copyFillValueInfo(pInfo->pFillSup, pInfo->pFillInfo); if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; diff --git a/tests/script/tsim/stream/streamInterpPartitionBy0.sim b/tests/script/tsim/stream/streamInterpPartitionBy0.sim index eff6706416..6b222de228 100644 --- a/tests/script/tsim/stream/streamInterpPartitionBy0.sim +++ b/tests/script/tsim/stream/streamInterpPartitionBy0.sim @@ -22,8 +22,28 @@ sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,1,0,0,1.0) (16 sql insert into t3 values(1648791212000,0,2,2,0.0) (1648791212001,1,2,2,1.0) (1648791217001,2,2,2,2.1); -print sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1; -#sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1; +print sql select _irowts, interp(a), _isfilled, b from st where b = 0 and c = 0 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 0 and c = 0 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +print sql select _irowts, interp(a), _isfilled, b from st where b = 1 and c =1 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 1 and c = 1 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +print sql select _irowts, interp(a), _isfilled, b from st where b = 2 and c = 2 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 2 and c = 2 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1; print $data00 $data01 $data02 $data03 $data04 $data05 print $data10 $data11 $data12 $data13 $data14 $data15 @@ -290,14 +310,32 @@ sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into run tsim/stream/checkTaskStatus.sim -sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,1,0,0,1.0) (1648791217001,2,0,0,2.1); - -sql insert into t2 values(1648791212000,0,1,1,0.0) (1648791212001,1,1,1,1.0) (1648791217001,2,1,1,2.1); +sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,1,0,0,1.0) (1648791217001,2,0,0,2.1) t2 values(1648791212000,0,1,1,0.0) (1648791212001,1,1,1,1.0) (1648791217001,2,1,1,2.1); sql insert into t3 values(1648791212000,0,2,2,0.0) (1648791212001,1,2,2,1.0) (1648791217001,2,2,2,2.1); -print sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1; -#sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1; +print sql select _irowts, interp(a), _isfilled, b from st where b = 0 and c = 0 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 0 and c = 0 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +print sql select _irowts, interp(a), _isfilled, b from st where b = 1 and c =1 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 1 and c = 1 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +print sql select _irowts, interp(a), _isfilled, b from st where b = 2 and c = 2 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 2 and c = 2 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1; print $data00 $data01 $data02 $data03 $data04 $data05 print $data10 $data11 $data12 $data13 $data14 $data15 diff --git a/tests/script/tsim/stream/streamInterpPartitionBy1.sim b/tests/script/tsim/stream/streamInterpPartitionBy1.sim new file mode 100644 index 0000000000..ecb5e0ee62 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpPartitionBy1.sim @@ -0,0 +1,592 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step NULL +print =============== create database +sql create database test vgroups 1; +sql use test; +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); + +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(2,2,2); + +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c every(1s) fill(NULL); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,1,0,0,1.0) (1648791217001,2,0,0,2.1) t2 values(1648791212000,0,1,1,0.0) (1648791212001,1,1,1,1.0) (1648791217001,2,1,1,2.1); + +sql insert into t3 values(1648791212000,0,2,2,0.0) (1648791212001,1,2,2,1.0) (1648791217001,2,2,2,2.1); + +print sql select _irowts, interp(a), _isfilled, b from st where b = 0 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 0 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +print sql select _irowts, interp(a), _isfilled, b from st where b = 1 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 1 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +print sql select _irowts, interp(a), _isfilled, b from st where b = 2 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 2 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +$loop_count = 0 +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt where b = 0 and c = 0 order by 1; +sql select * from streamt where b = 0 and c = 0 order by 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + + +# row 0 +if $rows != 6 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 0 then + print ======data01=$data01 + goto loop0 +endi + +if $data02 != 0 then + print ======data02=$data02 + goto loop0 +endi + +if $data11 != NULL then + print ======data11=$data11 + goto loop0 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop0 +endi + +if $data21 != NULL then + print ======data21=$data21 + goto loop0 +endi + +if $data22 != 1 then + print ======data22=$data22 + goto loop0 +endi + +if $data31 != NULL then + print ======data31=$data31 + goto loop0 +endi + +if $data32 != 1 then + print ======data32=$data32 + goto loop0 +endi + +if $data41 != NULL then + print ======data41=$data41 + goto loop0 +endi + +if $data42 != 1 then + print ======data41=$data41 + goto loop0 +endi + +if $data51 != NULL then + print ======data51=$data51 + goto loop0 +endi + +if $data52 != 1 then + print ======data51=$data51 + goto loop0 +endi + +print 1 sql select * from streamt where b = 1 and c = 1 order by 1; +sql select * from streamt where b = 1 and c = 1 order by 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + + +# row 0 +if $rows != 6 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 0 then + print ======data01=$data01 + goto loop0 +endi + +if $data02 != 0 then + print ======data02=$data02 + goto loop0 +endi + +if $data11 != NULL then + print ======data11=$data11 + goto loop0 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop0 +endi + +if $data21 != NULL then + print ======data21=$data21 + goto loop0 +endi + +if $data22 != 1 then + print ======data22=$data22 + goto loop0 +endi + +if $data31 != NULL then + print ======data31=$data31 + goto loop0 +endi + +if $data32 != 1 then + print ======data32=$data32 + goto loop0 +endi + +if $data41 != NULL then + print ======data41=$data41 + goto loop0 +endi + +if $data42 != 1 then + print ======data41=$data41 + goto loop0 +endi + +if $data51 != NULL then + print ======data51=$data51 + goto loop0 +endi + +if $data52 != 1 then + print ======data51=$data51 + goto loop0 +endi + +print 2 sql select * from streamt where b = 2 and c = 2 order by 1; +sql select * from streamt where b = 2 and c = 2 order by 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + + +# row 0 +if $rows != 6 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 0 then + print ======data01=$data01 + goto loop0 +endi + +if $data02 != 0 then + print ======data02=$data02 + goto loop0 +endi + +if $data11 != NULL then + print ======data11=$data11 + goto loop0 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop0 +endi + +if $data21 != NULL then + print ======data21=$data21 + goto loop0 +endi + +if $data22 != 1 then + print ======data22=$data22 + goto loop0 +endi + +if $data31 != NULL then + print ======data31=$data31 + goto loop0 +endi + +if $data32 != 1 then + print ======data32=$data32 + goto loop0 +endi + +if $data41 != NULL then + print ======data41=$data41 + goto loop0 +endi + +if $data42 != 1 then + print ======data41=$data41 + goto loop0 +endi + +if $data51 != NULL then + print ======data51=$data51 + goto loop0 +endi + +if $data52 != 1 then + print ======data51=$data51 + goto loop0 +endi + +print step linear +print =============== create database +sql create database test2 vgroups 1; +sql use test2; +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); + +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(2,2,2); + +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c every(1s) fill(linear); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,10,0,0,1.0) (1648791217001,20,0,0,2.1) t2 values(1648791212000,0,1,1,0.0) (1648791212001,10,1,1,1.0) (1648791217001,20,1,1,2.1); + +sql insert into t3 values(1648791212000,0,2,2,0.0) (1648791212001,10,2,2,1.0) (1648791217001,20,2,2,2.1); + +print sql select _irowts, interp(a), _isfilled, b from st where b = 0 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 0 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +$loop_count = 0 +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt2 where b = 0 and c = 0 order by 1; +sql select * from streamt2 where b = 0 and c = 0 order by 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + + +# row 0 +if $rows != 6 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 0 then + print ======data01=$data01 + goto loop1 +endi + +if $data02 != 0 then + print ======data02=$data02 + goto loop1 +endi + +if $data11 != 11 then + print ======data11=$data11 + goto loop1 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop1 +endi + +if $data21 != 13 then + print ======data21=$data21 + goto loop1 +endi + +if $data22 != 1 then + print ======data22=$data22 + goto loop1 +endi + +if $data31 != 15 then + print ======data31=$data31 + goto loop1 +endi + +if $data32 != 1 then + print ======data32=$data32 + goto loop1 +endi + +if $data41 != 17 then + print ======data41=$data41 + goto loop1 +endi + +if $data42 != 1 then + print ======data41=$data41 + goto loop1 +endi + +if $data51 != 19 then + print ======data51=$data51 + goto loop1 +endi + +if $data52 != 1 then + print ======data51=$data51 + goto loop1 +endi + +print sql select _irowts, interp(a), _isfilled, b from st where b = 1 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 1 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +print 1 sql select * from streamt2 where b = 1 and c = 1 order by 1; +sql select * from streamt2 where b = 1 and c = 1 order by 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + + +# row 0 +if $rows != 6 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 0 then + print ======data01=$data01 + goto loop1 +endi + +if $data02 != 0 then + print ======data02=$data02 + goto loop1 +endi + +if $data11 != 11 then + print ======data11=$data11 + goto loop1 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop1 +endi + +if $data21 != 13 then + print ======data21=$data21 + goto loop1 +endi + +if $data22 != 1 then + print ======data22=$data22 + goto loop1 +endi + +if $data31 != 15 then + print ======data31=$data31 + goto loop1 +endi + +if $data32 != 1 then + print ======data32=$data32 + goto loop1 +endi + +if $data41 != 17 then + print ======data41=$data41 + goto loop1 +endi + +if $data42 != 1 then + print ======data41=$data41 + goto loop1 +endi + +if $data51 != 19 then + print ======data51=$data51 + goto loop1 +endi + +if $data52 != 1 then + print ======data51=$data51 + goto loop1 +endi + +print sql select _irowts, interp(a), _isfilled, b from st where b = 2 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1; +sql select _irowts, interp(a), _isfilled, b from st where b = 2 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + +print 2 sql select * from streamt2 where b = 2 and c = 2 order by 1; +sql select * from streamt2 where b = 2 and c = 2 order by 1; + +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 +print $data50 $data51 $data52 $data53 $data54 $data55 + + +# row 0 +if $rows != 6 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 0 then + print ======data01=$data01 + goto loop1 +endi + +if $data02 != 0 then + print ======data02=$data02 + goto loop1 +endi + +if $data11 != 11 then + print ======data11=$data11 + goto loop1 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop1 +endi + +if $data21 != 13 then + print ======data21=$data21 + goto loop1 +endi + +if $data22 != 1 then + print ======data22=$data22 + goto loop1 +endi + +if $data31 != 15 then + print ======data31=$data31 + goto loop1 +endi + +if $data32 != 1 then + print ======data32=$data32 + goto loop1 +endi + +if $data41 != 17 then + print ======data41=$data41 + goto loop1 +endi + +if $data42 != 1 then + print ======data41=$data41 + goto loop1 +endi + +if $data51 != 19 then + print ======data51=$data51 + goto loop1 +endi + +if $data52 != 1 then + print ======data51=$data51 + goto loop1 +endi + + +system sh/exec.sh -n dnode1 -s stop -x SIGINT