diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e28a24e25c..c77f5456f3 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -473,6 +473,8 @@ typedef struct SStreamFillSupporter { bool hasDelete; SStorageAPI* pAPI; STimeWindow winRange; + int32_t pkColBytes; + __compar_fn_t comparePkColFn; } SStreamFillSupporter; typedef struct SStreamScanInfo { diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index ed4e6d4549..bcfe5d5479 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -45,6 +45,7 @@ typedef struct SSlicePoint { void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type); bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo); +void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo); int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption); void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index e4ace6b83a..6c52dbaed0 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1555,7 +1555,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) { } int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr, - SExprSupp* pTbnameExpr) { + SExprSupp* pTbnameExpr, SExprSupp* pResExprSupp, int32_t* pPkColIndex) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI; @@ -1568,6 +1568,11 @@ int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pPar pScanInfo->partitionSup = *pParSup; pScanInfo->pPartScalarSup = pExpr; pScanInfo->pPartTbnameSup = pTbnameExpr; + for (int32_t j = 0; j < pResExprSupp->numOfExprs; j++) { + if (pScanInfo->primaryKeyIndex == pResExprSupp->pExprInfo[j].base.pParam[0].pCol->slotId) { + *pPkColIndex = j; + } + } if (!pScanInfo->pUpdateInfo) { code = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo); @@ -1729,7 +1734,8 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); - code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup); + pInfo->basic.primaryPkIndex = -1; + code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup, &pOperator->exprSupp, &pInfo->basic.primaryPkIndex); QUERY_CHECK_CODE(code, lino, _error); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ad1164d7e1..32967c4a56 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4275,6 +4275,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* SDataType pkType = {0}; pInfo->primaryKeyIndex = -1; + pInfo->basic.primaryPkIndex = -1; int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno); @@ -4292,6 +4293,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } if (id->isPk) { pInfo->primaryKeyIndex = id->dstSlotId; + pInfo->basic.primaryPkIndex = id->dstSlotId; pkType = id->dataType; } } diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c index 875ae00350..b94798934c 100644 --- a/source/libs/executor/src/streamexecutorInt.c +++ b/source/libs/executor/src/streamexecutorInt.c @@ -28,3 +28,8 @@ bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo) { void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) { pBasicInfo->updateOperatorInfo = false; } + +void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) { + pBasicInfo->primaryPkIndex = -1; + pBasicInfo->updateOperatorInfo = false; +} diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index d6ae6849ad..20687964d4 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -275,7 +275,7 @@ static int32_t initTimeSliceResultBuf(SStreamFillSupporter* pFillSup, SExprSupp* return TSDB_CODE_SUCCESS; } -static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs, +static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs, SColumnInfo* pPkCol, SStreamFillSupporter** ppResFillSup) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -306,6 +306,14 @@ static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SE QUERY_CHECK_CODE(code, lino, _end); pFillSup->hasDelete = false; + if (pPkCol != NULL) { + pFillSup->pkColBytes = pPkCol->bytes; + pFillSup->comparePkColFn = getKeyComparFunc(pPkCol->type, TSDB_ORDER_ASC); + } else { + pFillSup->pkColBytes = 0; + pFillSup->comparePkColFn = NULL; + } + (*ppResFillSup) = pFillSup; _end: @@ -629,7 +637,7 @@ void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) { pPoint->pLeftRow = pPoint->pRightRow; } else { setResultRowData(&pPoint->pLeftRow, pPoint->pResPos->pRowBuff); - void* pBuff = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize); + void* pBuff = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize + pFillSup->pkColBytes); setResultRowData(&pPoint->pRightRow, pBuff); } } @@ -1114,7 +1122,13 @@ _end: } } -static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fillType) { +static int32_t comparePkVal(void* pLeft, void* pRight, SStreamFillSupporter* pFillSup) { + void* pTmpVal = POINTER_SHIFT(pLeft, pFillSup->rowSize); + return pFillSup->comparePkColFn(pTmpVal, pRight); +} + +static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, void* pPkVal, SStreamFillSupporter* pFillSup, bool isLeft, + int32_t fillType) { if (IS_INVALID_WIN_KEY(pPoint->key.ts)) { return false; } @@ -1124,29 +1138,83 @@ static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t case TSDB_FILL_NULL_F: case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE_F: { - if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->key.ts == ts) ) { - return true; + if (!isLeft) { + if (HAS_NON_ROW_DATA(pPoint->pRightRow)) { + return true; + } else { + if (pPoint->key.ts == ts) { + if (pFillSup->comparePkColFn == NULL || + comparePkVal(pPoint->pRightRow, pPkVal, pFillSup) >= 0) { + return true; + } + } + } } } break; case TSDB_FILL_PREV: { - if (isLeft && (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pPoint->pLeftRow->key <= ts)) { - return true; + if (isLeft) { + if (HAS_NON_ROW_DATA(pPoint->pLeftRow)) { + return true; + } else { + if (pPoint->pLeftRow->key < ts) { + return true; + } else if (pPoint->pLeftRow->key == ts) { + if (pFillSup->comparePkColFn == NULL || comparePkVal(pPoint->pLeftRow, pPkVal, pFillSup) >= 0) { + return true; + } + } + } } if (!isLeft && pPoint->key.ts == ts) { - return true; + if (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pFillSup->comparePkColFn == NULL || + comparePkVal(pPoint->pLeftRow, pPkVal, pFillSup) >= 0) { + return true; + } } } break; case TSDB_FILL_NEXT: { - if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->pRightRow->key >= ts)) { - return true; + if (!isLeft) { + if (HAS_NON_ROW_DATA(pPoint->pRightRow)) { + return true; + } else { + if (pPoint->pRightRow->key > ts) { + return true; + } else if (pPoint->pRightRow->key == ts) { + if (pFillSup->comparePkColFn == NULL || + comparePkVal(pPoint->pRightRow, pPkVal, pFillSup) >= 0) { + return true; + } + } + } } } break; case TSDB_FILL_LINEAR: { - if (isLeft && (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pPoint->pLeftRow->key <= ts)) { - return true; - } else if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->pRightRow->key >= ts)) { - return true; + if (isLeft) { + if (HAS_NON_ROW_DATA(pPoint->pLeftRow)) { + return true; + } else { + if (pPoint->pLeftRow->key < ts) { + return true; + } else if (pPoint->pLeftRow->key == ts) { + if (pFillSup->comparePkColFn == NULL || comparePkVal(pPoint->pLeftRow, pPkVal, pFillSup) >= 0) { + return true; + } + } + } + } else { + if (HAS_NON_ROW_DATA(pPoint->pRightRow)) { + return true; + } else { + if (pPoint->pRightRow->key > ts) { + return true; + } else if (pPoint->pRightRow->key == ts) { + if (pFillSup->comparePkColFn == NULL || + comparePkVal(pPoint->pRightRow, pPkVal, pFillSup) >= 0) { + return true; + } + } + } } } break; default: @@ -1155,7 +1223,8 @@ static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t return false; } -static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal) { +static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, + int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); @@ -1175,6 +1244,14 @@ static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKE } } pRowVal->key = ts; + if (pPkData != NULL) { + void* pPkVal = POINTER_SHIFT(pRowVal, rowSize); + if (IS_VAR_DATA_TYPE(pPkCol->info.type)) { + memcpy(pPkVal, pPkData, varDataTLen(pPkData)); + } else { + memcpy(pPkVal, pPkData, pPkCol->info.bytes); + } + } } static int32_t saveTimeSliceWinResultInfo(SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup, SWinKey* pKey, @@ -1266,9 +1343,12 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } QUERY_CHECK_CODE(code, lino, _end); - right = needAdjustValue(&curPoint, tsCols[startPos], false, pFillSup->type); + if (hasSrcPrimaryKeyCol(&pInfo->basic)) { + pPkVal = colDataGetData(pPkColDataInfo, startPos); + } + right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); if (right) { - transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); + transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1282,9 +1362,12 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) startPos += numOfWin; int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull); QUERY_CHECK_CONDITION((leftRowId >= 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); - left = needAdjustValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type); + if (hasSrcPrimaryKeyCol(&pInfo->basic)) { + pPkVal = colDataGetData(pPkColDataInfo, leftRowId); + } + left = needAdjustValue(&nextPoint, tsCols[leftRowId], pPkVal, pFillSup, true, pFillSup->type); if (left) { - transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); + transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1304,9 +1387,12 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } QUERY_CHECK_CODE(code, lino, _end); - right = needAdjustValue(&curPoint, tsCols[startPos], false, pFillSup->type); + if (hasSrcPrimaryKeyCol(&pInfo->basic)) { + pPkVal = colDataGetData(pPkColDataInfo, startPos); + } + right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); if (right) { - transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); + transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1408,6 +1494,90 @@ _end: } } +static void doBuildTimeSliceDeleteResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, SArray* pWins, int32_t* index, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + blockDataCleanup(pBlock); + int32_t size = taosArrayGetSize(pWins); + if (*index == size) { + *index = 0; + taosArrayClear(pWins); + goto _end; + } + code = blockDataEnsureCapacity(pBlock, size - *index); + QUERY_CHECK_CODE(code, lino, _end); + + uint64_t uid = 0; + for (int32_t i = *index; i < size; i++) { + SWinKey* pKey = taosArrayGet(pWins, i); + SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId}; + SSlicePoint prevPoint = {0}; + SSlicePoint nextPoint = {0}; + STimeWindow tw = {0}; + if (pFillSup->type != TSDB_FILL_LINEAR) { + code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); + } else { + code = + getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); + } + QUERY_CHECK_CODE(code, lino, _end); + + if (pFillSup->type == TSDB_FILL_PREV && hasNextWindow(pFillSup)) { + tw.skey = pFillSup->cur.key; + tw.ekey = pFillSup->next.key; + } else if (pFillSup->type == TSDB_FILL_NEXT && hasPrevWindow(pFillSup)) { + tw.skey = pFillSup->prev.key; + tw.ekey = pFillSup->cur.key; + } else if (pFillSup->type == TSDB_FILL_LINEAR) { + if (hasPrevWindow(pFillSup)) { + tw.skey = pFillSup->prev.key; + } else { + tw.skey = pFillSup->cur.key; + } + if (hasNextWindow(pFillSup)) { + tw.ekey = pFillSup->next.key; + } else { + tw.ekey = pFillSup->cur.key; + } + } else { + tw.skey = pFillSup->cur.key; + tw.ekey = pFillSup->cur.key; + } + + if (tw.skey == INT64_MIN || tw.ekey == INT64_MIN) { + continue; + } + + releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); + releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore); + releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore); + + void* tbname = NULL; + int32_t winCode = TSDB_CODE_SUCCESS; + + code = pAggSup->stateStore.streamStateGetParName(pAggSup->pState, pKey->groupId, &tbname, false, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + + if (winCode != TSDB_CODE_SUCCESS) { + code = appendDataToSpecialBlock(pBlock, &tw.skey, &tw.ekey, &uid, &pKey->groupId, NULL); + QUERY_CHECK_CODE(code, lino, _end); + } else { + QUERY_CHECK_CONDITION((tbname), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; + STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); + code = appendDataToSpecialBlock(pBlock, &tw.skey, &tw.ekey, &uid, &pKey->groupId, parTbName); + QUERY_CHECK_CODE(code, lino, _end); + } + pAggSup->stateStore.streamStateFreeVal(tbname); + (*index)++; + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } +} + static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1416,6 +1586,15 @@ static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRe uint16_t opType = pOperator->operatorType; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + + doBuildTimeSliceDeleteResult(pAggSup, pInfo->pFillSup, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->pDelRes; + goto _end; + } + doBuildTimeSlicePointResult(pAggSup, &pInfo->twAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo); if (pInfo->pRes->info.rows != 0) { printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); @@ -1463,8 +1642,7 @@ _end: return code; } -static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pUpdatedMap, - SArray* pDelWins) { +static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pUpdatedMap) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t winCode = TSDB_CODE_SUCCESS; @@ -1590,7 +1768,7 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR switch (pBlock->info.type) { case STREAM_DELETE_RESULT: case STREAM_DELETE_DATA: { - code = doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap, pInfo->pDelWins); + code = doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap); QUERY_CHECK_CODE(code, lino, _end); code = copyDataBlock(pInfo->pDelRes, pBlock); QUERY_CHECK_CODE(code, lino, _end); @@ -1678,12 +1856,6 @@ _end: return code; } -static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - (void)doStreamTimeSliceNext(pOperator, &pRes); - return pRes; -} - static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) { if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F) { int32_t valueIndex = 0; @@ -1723,14 +1895,22 @@ static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* p } } -int32_t getDownstreamRes(SOperatorInfo* downstream, SSDataBlock** ppRes) { +int32_t getDownstreamRes(SOperatorInfo* downstream, SSDataBlock** ppRes, SColumnInfo** ppPkCol) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pInfo = (SStreamScanInfo*)downstream->info; *ppRes = pInfo->pRes; + if (hasSrcPrimaryKeyCol(&pInfo->basic)) { + SColumnInfoData* pPkColInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->basic.primaryPkIndex); + (*ppPkCol) = &pPkColInfo->info; + } return TSDB_CODE_SUCCESS; } else if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)downstream->info; *ppRes = pInfo->binfo.pRes; + if (hasSrcPrimaryKeyCol(&pInfo->basic)) { + SColumnInfoData* pPkColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pInfo->basic.primaryPkIndex); + (*ppPkCol) = &pPkColInfo->info; + } return TSDB_CODE_SUCCESS; } qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_FAILED)); @@ -1744,8 +1924,9 @@ int32_t initTimeSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { - SStreamPartitionOperatorInfo* pScanInfo = downstream->info; - pScanInfo->tsColIndex = tsColIndex; + SStreamPartitionOperatorInfo* pPartionInfo = downstream->info; + pPartionInfo->tsColIndex = tsColIndex; + pBasic->primaryPkIndex = pPartionInfo->basic.primaryPkIndex; } if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -1766,7 +1947,9 @@ int32_t initTimeSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pScanInfo->pFillSup = pFillSup; pScanInfo->interval = pFillSup->interval; pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo; - pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex; + if (!hasSrcPrimaryKeyCol(pBasic)) { + pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex; + } _end: if (code != TSDB_CODE_SUCCESS) { @@ -1820,8 +2003,13 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->primaryTsIndex = ((SColumnNode*)pInterpPhyNode->pTimeSeries)->slotId; + SSDataBlock* pDownRes = NULL; + SColumnInfo* pPkCol = NULL; + code = getDownstreamRes(downstream, &pDownRes, &pPkCol); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pFillSup = NULL; - code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, &pInfo->pFillSup); + code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, pPkCol, &pInfo->pFillSup); QUERY_CHECK_CODE(code, lino, _error); int32_t ratio = 1; @@ -1829,10 +2017,13 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* ratio = 2; } - code = - initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, sizeof(TSKEY), - 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), - &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SORT, ratio); + int32_t keyBytes = sizeof(TSKEY); + if (pPkCol) { + keyBytes += pPkCol->bytes; + } + code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, keyBytes, 0, + &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), + &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SORT, ratio); QUERY_CHECK_CODE(code, lino, _error); code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); @@ -1866,10 +2057,6 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->destHasPrimaryKey = pInterpPhyNode->streamNodeOption.destHasPrimaryKey; pInfo->numOfDatapack = 0; - SSDataBlock* pDownRes = NULL; - code = getDownstreamRes(downstream, &pDownRes); - QUERY_CHECK_CODE(code, lino, _error); - pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pDownRes); copyFillValueInfo(pInfo->pFillSup, pInfo->pFillInfo); pInfo->ignoreNull = getIgoreNullRes(pExpSup); @@ -1899,6 +2086,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState); + initStreamBasicInfo(&pInfo->basic); if (downstream) { code = initTimeSliceDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic, pInfo->pFillSup); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5200de0ab7..97e10d0d2f 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -11108,7 +11108,7 @@ static int32_t checkAndAdjStreamDestTableSchema(STranslateContext* pCxt, SCreate .bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; } int32_t code = checkTableSchemaImpl(pCxt, pStmt->pTags, pStmt->pCols, NULL); - if (TSDB_CODE_SUCCESS == code && NULL == pSelect->pWindow && + if (TSDB_CODE_SUCCESS == code && NULL == pSelect->pWindow && !pSelect->hasInterpFunc && ((SRealTableNode*)pSelect->pFromTable && hasPkInTable(((SRealTableNode*)pSelect->pFromTable)->pMeta))) { if (1 >= LIST_LENGTH(pStmt->pCols) || 1 >= LIST_LENGTH(pSelect->pProjectionList)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 9a1a30ac7b..e1472aa9dd 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -214,7 +214,6 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b if (pkLen != 0) { pInfo->comparePkRowFn = compareKeyTsAndPk; pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC); - ; } else { pInfo->comparePkRowFn = compareKeyTs; pInfo->comparePkCol = NULL; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index c432f67623..ed7a09f648 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1369,6 +1369,10 @@ ,,y,script,./test.sh -f tsim/stream/streamInterpPartitionBy0.sim ,,y,script,./test.sh -f tsim/stream/streamInterpPrev0.sim ,,y,script,./test.sh -f tsim/stream/streamInterpPrev1.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey1.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey2.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey3.sim ,,y,script,./test.sh -f tsim/stream/streamInterpUpdate.sim ,,y,script,./test.sh -f tsim/stream/streamInterpUpdate1.sim ,,y,script,./test.sh -f tsim/stream/streamInterpValue0.sim diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey0.sim b/tests/script/tsim/stream/streamInterpPrimaryKey0.sim new file mode 100644 index 0000000000..9edddff6db --- /dev/null +++ b/tests/script/tsim/stream/streamInterpPrimaryKey0.sim @@ -0,0 +1,452 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp,a int primary key,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(b) from st partition by tbname every(1s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,9,9,9,9.0); + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + + +sql insert into t1 values(1648791213000,10,10,10,10.0); +sql insert into t1 values(1648791213009,30,30,30,30.0); + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop1 +endi + + +sql insert into t1 values(1648791213009,20,20,20,20.0) (1648791217001,4,4,4,4.1); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != 20 then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != 20 then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != 20 then + print ======data31=$data31 + goto loop2 +endi + +if $data41 != 20 then + print ======data41=$data41 + goto loop2 +endi + +print step2 +print =============== create database +sql create database test2 vgroups 1; +sql use test2; + +sql create stable st(ts timestamp,a int ,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt(ts, b primary key) as select _irowts, interp(b) from st partition by tbname every(1s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,9,9,9,9.0); + +$loop_count = 0 + +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop3 +endi + +sql insert into t1 values(1648791213000,10,10,10,10.0); + +sql insert into t1 values(1648791213009,30,30,30,30.0); + +$loop_count = 0 + +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop4 +endi + +# row 0 +if $data01 != 10 then + print ======data01=$data01 + goto loop4 +endi + +sql insert into t1 values(1648791217001,4,4,10,4.1); + +sleep 2000 + +sql insert into t1 values(1648791213009,20,20,10,20.0); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop5 +endi + +# row 0 +if $data01 != 10 then + print ======data01=$data01 + goto loop5 +endi + +if $data11 != 20 then + print ======data11=$data11 + goto loop5 +endi + +if $data21 != 20 then + print ======data21=$data21 + goto loop5 +endi + +if $data31 != 20 then + print ======data31=$data31 + goto loop5 +endi + +if $data41 != 20 then + print ======data41=$data41 + goto loop5 +endi + +print step3 +print =============== create database +sql create database test3 vgroups 1; +sql use test3; + +sql create stable st(ts timestamp,a int primary key, b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt(ts, b primary key) as select _irowts, interp(b) from st partition by tbname, c every(1s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,9,9,10,9.0); + +$loop_count = 0 + +loop6: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop6 +endi + + +sql insert into t1 values(1648791213000,10,10,10,10.0); +sql insert into t1 values(1648791213009,30,30,10,30.0); + +$loop_count = 0 + +loop7: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop7 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop7 +endi + + +sql insert into t1 values(1648791217001,4,4,10,4.1); + +sleep 2000 + +sql insert into t1 values(1648791213009,20,20,10,20.0); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop8: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop8 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop8 +endi + +if $data11 != 20 then + print ======data11=$data11 + goto loop8 +endi + +if $data21 != 20 then + print ======data21=$data21 + goto loop8 +endi + +if $data31 != 20 then + print ======data31=$data31 + goto loop8 +endi + +if $data41 != 20 then + print ======data41=$data41 + goto loop8 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey1.sim b/tests/script/tsim/stream/streamInterpPrimaryKey1.sim new file mode 100644 index 0000000000..25d1480917 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpPrimaryKey1.sim @@ -0,0 +1,458 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp,a int primary key,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(b) from st partition by tbname every(1s) fill(next); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,10,10,10,10.0); + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + + +sql insert into t1 values(1648791213000,9,9,9,9.0); +sql insert into t1 values(1648791213009,30,30,30,30.0); + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop1 +endi + + +sql insert into t1 values(1648791213009,20,20,20,20.0) (1648791217001,40,40,40,40.1); + +sleep 2000 + +sql insert into t1 values(1648791217001,4,4,4,4.1); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != 4 then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != 4 then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != 4 then + print ======data31=$data31 + goto loop2 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop2 +endi + +print step2 +print =============== create database +sql create database test2 vgroups 1; +sql use test2; + +sql create stable st(ts timestamp,a int ,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt(ts, b primary key) as select _irowts, interp(b) from st partition by tbname every(1s) fill(next); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,10,10,10,10.0); + +$loop_count = 0 + +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop3 +endi + + +sql insert into t1 values(1648791213000,9,9,9,9.0); +sql insert into t1 values(1648791213009,30,30,30,30.0); + +$loop_count = 0 + +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop4 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop4 +endi + + +sql insert into t1 values(1648791213009,20,20,20,20.0) (1648791217001,40,40,40,40.1); + +sleep 2000 + +sql insert into t1 values(1648791217001,4,4,4,4.1); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop5 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop5 +endi + +if $data11 != 4 then + print ======data11=$data11 + goto loop5 +endi + +if $data21 != 4 then + print ======data21=$data21 + goto loop5 +endi + +if $data31 != 4 then + print ======data31=$data31 + goto loop5 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop5 +endi + +print step3 +print =============== create database +sql create database test3 vgroups 1; +sql use test3; + +sql create stable st(ts timestamp,a int primary key, b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt(ts, b primary key) as select _irowts, interp(b) from st partition by tbname, c every(1s) fill(next); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791213000,10,10,10,10.0); + +$loop_count = 0 + +loop6: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop6 +endi + + +sql insert into t1 values(1648791213000,9,9,10,9.0); +sql insert into t1 values(1648791213009,30,30,10,30.0); + +$loop_count = 0 + +loop7: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop7 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop7 +endi + + +sql insert into t1 values(1648791213009,20,20,10,20.0) (1648791217001,40,40,10,40.1); + +sleep 1000 + +sql insert into t1 values(1648791217001,4,4,10,4.1); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 +print $data60 $data61 $data62 $data63 $data64 + + +$loop_count = 0 +loop8: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop8 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop8 +endi + +if $data11 != 4 then + print ======data11=$data11 + goto loop8 +endi + +if $data21 != 4 then + print ======data21=$data21 + goto loop8 +endi + +if $data31 != 4 then + print ======data31=$data31 + goto loop8 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop8 +endi + + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey2.sim b/tests/script/tsim/stream/streamInterpPrimaryKey2.sim new file mode 100644 index 0000000000..f06e1ecd03 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpPrimaryKey2.sim @@ -0,0 +1,452 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp,a int primary key,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(b) from st partition by tbname every(1s) fill(linear); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,9,9,9,9.0); + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + + +sql insert into t1 values(1648791213000,10,10,10,10.0); +sql insert into t1 values(1648791213009,30,30,30,30.0); + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop1 +endi + + +sql insert into t1 values(1648791213009,20,20,20,20.0) (1648791217001,4,4,4,4.1); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != 16 then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != 12 then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != 8 then + print ======data31=$data31 + goto loop2 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop2 +endi + +print step2 +print =============== create database +sql create database test2 vgroups 1; +sql use test2; + +sql create stable st(ts timestamp,a int ,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt(ts, b primary key) as select _irowts, interp(b) from st partition by tbname every(1s) fill(linear); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,9,9,9,9.0); + +$loop_count = 0 + +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop3 +endi + +sql insert into t1 values(1648791213000,10,10,10,10.0); + +sql insert into t1 values(1648791213009,30,30,30,30.0); + +$loop_count = 0 + +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop4 +endi + +# row 0 +if $data01 != 10 then + print ======data01=$data01 + goto loop4 +endi + +sql insert into t1 values(1648791217001,4,4,10,4.1); + +sleep 2000 + +sql insert into t1 values(1648791213009,20,20,10,20.0); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop5 +endi + +# row 0 +if $data01 != 10 then + print ======data01=$data01 + goto loop5 +endi + +if $data11 != 16 then + print ======data11=$data11 + goto loop5 +endi + +if $data21 != 12 then + print ======data21=$data21 + goto loop5 +endi + +if $data31 != 8 then + print ======data31=$data31 + goto loop5 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop5 +endi + +print step3 +print =============== create database +sql create database test3 vgroups 1; +sql use test3; + +sql create stable st(ts timestamp,a int primary key, b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt(ts, b primary key) as select _irowts, interp(b) from st partition by tbname, c every(1s) fill(linear); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,9,9,10,9.0); + +$loop_count = 0 + +loop6: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop6 +endi + + +sql insert into t1 values(1648791213000,10,10,10,10.0); +sql insert into t1 values(1648791213009,30,30,10,30.0); + +$loop_count = 0 + +loop7: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop7 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop7 +endi + + +sql insert into t1 values(1648791217001,4,4,10,4.1); + +sleep 2000 + +sql insert into t1 values(1648791213009,20,20,10,20.0); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop8: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop8 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop8 +endi + +if $data11 != 16 then + print ======data11=$data11 + goto loop8 +endi + +if $data21 != 12 then + print ======data21=$data21 + goto loop8 +endi + +if $data31 != 8 then + print ======data31=$data31 + goto loop8 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop8 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey3.sim b/tests/script/tsim/stream/streamInterpPrimaryKey3.sim new file mode 100644 index 0000000000..725cf8d850 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpPrimaryKey3.sim @@ -0,0 +1,452 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp,a int primary key,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(b) from st partition by tbname every(1s) fill(value,100); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,9,9,9,9.0); + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + + +sql insert into t1 values(1648791213000,10,10,10,10.0); +sql insert into t1 values(1648791213009,30,30,30,30.0); + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop1 +endi + + +sql insert into t1 values(1648791213009,20,20,20,20.0) (1648791217001,4,4,4,4.1); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != 100 then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != 100 then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != 100 then + print ======data31=$data31 + goto loop2 +endi + +if $data41 != 100 then + print ======data41=$data41 + goto loop2 +endi + +print step2 +print =============== create database +sql create database test2 vgroups 1; +sql use test2; + +sql create stable st(ts timestamp,a int ,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt(ts, b primary key) as select _irowts, interp(b) from st partition by tbname every(1s) fill(value,100); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,9,9,9,9.0); + +$loop_count = 0 + +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop3 +endi + +sql insert into t1 values(1648791213000,10,10,10,10.0); + +sql insert into t1 values(1648791213009,30,30,30,30.0); + +$loop_count = 0 + +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop4 +endi + +# row 0 +if $data01 != 10 then + print ======data01=$data01 + goto loop4 +endi + +sql insert into t1 values(1648791217001,4,4,10,4.1); + +sleep 2000 + +sql insert into t1 values(1648791213009,20,20,10,20.0); + +print sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop5 +endi + +# row 0 +if $data01 != 10 then + print ======data01=$data01 + goto loop5 +endi + +if $data11 != 100 then + print ======data11=$data11 + goto loop5 +endi + +if $data21 != 100 then + print ======data21=$data21 + goto loop5 +endi + +if $data31 != 100 then + print ======data31=$data31 + goto loop5 +endi + +if $data41 != 100 then + print ======data41=$data41 + goto loop5 +endi + +print step3 +print =============== create database +sql create database test3 vgroups 1; +sql use test3; + +sql create stable st(ts timestamp,a int primary key, b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt(ts, b primary key) as select _irowts, interp(b) from st partition by tbname, c every(1s) fill(value,100); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,9,9,10,9.0); + +$loop_count = 0 + +loop6: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop6 +endi + + +sql insert into t1 values(1648791213000,10,10,10,10.0); +sql insert into t1 values(1648791213009,30,30,10,30.0); + +$loop_count = 0 + +loop7: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop7 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop7 +endi + + +sql insert into t1 values(1648791217001,4,4,10,4.1); + +sleep 2000 + +sql insert into t1 values(1648791213009,20,20,10,20.0); + +print sql select _irowts,interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100); +sql select _irowts, interp(b) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop8: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop8 +endi + +# row 0 +if $data01 != 9 then + print ======data01=$data01 + goto loop8 +endi + +if $data11 != 100 then + print ======data11=$data11 + goto loop8 +endi + +if $data21 != 100 then + print ======data21=$data21 + goto loop8 +endi + +if $data31 != 100 then + print ======data31=$data31 + goto loop8 +endi + +if $data41 != 100 then + print ======data41=$data41 + goto loop8 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT