diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index db09478372..815994c128 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -546,9 +546,10 @@ typedef struct SStreamScanInfo { uint64_t numOfExec; // execution times STqReader* tqReader; - SHashObj* pVtableMergeHandles; // key: vtable uid, value: SStreamVtableMergeHandle - SDiskbasedBuf* pVtableMergeBuf; // page buffer used by vtable merge - SArray* pVtableReadyHandles; + SHashObj* pVtableMergeHandles; // key: vtable uid, value: SStreamVtableMergeHandle + SDiskbasedBuf* pVtableMergeBuf; // page buffer used by vtable merge + SArray* pVtableReadyHandles; + STableListInfo* pTableListInfo; uint64_t groupId; bool igCheckGroupId; diff --git a/source/libs/executor/inc/streamVtableMerge.h b/source/libs/executor/inc/streamVtableMerge.h index 5d1e401e07..3e27ae1ced 100644 --- a/source/libs/executor/inc/streamVtableMerge.h +++ b/source/libs/executor/inc/streamVtableMerge.h @@ -30,11 +30,14 @@ typedef enum { SVM_NEXT_FOUND = 1, } SVM_NEXT_RESULT; -int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle **ppHandle, int32_t nSrcTbls, int32_t numPageLimit, - SDiskbasedBuf *pBuf, SSDataBlock *pResBlock, const char *idstr); +int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle **ppHandle, int64_t vuid, int32_t nSrcTbls, + int32_t numPageLimit, int32_t primaryTsIndex, SDiskbasedBuf *pBuf, + SSDataBlock *pResBlock, const char *idstr); void streamVtableMergeDestroyHandle(void *ppHandle); +int64_t streamVtableMergeHandleGetVuid(SStreamVtableMergeHandle *pHandle); + int32_t streamVtableMergeAddBlock(SStreamVtableMergeHandle *pHandle, SSDataBlock *pDataBlock, const char *idstr); int32_t streamVtableMergeMoveNext(SStreamVtableMergeHandle *pHandle, SVM_NEXT_RESULT *pRes, const char *idstr); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index f1d5a65809..7aea03699c 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -707,7 +707,6 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } code = createStreamVtableMergeOperatorInfo(ops[0], pHandle, pVirtualTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOptr); - tableListDestroy(pTableListInfo); } else { code = TSDB_CODE_INVALID_PARA; pTaskInfo->code = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d4a3be125f..11390ba38b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3157,7 +3157,7 @@ int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STime SOperatorInfo* pOperator = pInfo->pStreamScanOp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; const char* id = GET_TASKID(pTaskInfo); - SSHashObj* pVtableInfos = pTaskInfo->pSubplan->pVTables; + bool isVtableSourceScan = (pTaskInfo->pSubplan->pVTables != NULL); code = blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); QUERY_CHECK_CODE(code, lino, _end); @@ -3168,7 +3168,7 @@ int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STime pBlockInfo->version = pBlock->info.version; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - if (pVtableInfos == NULL) { + if (!isVtableSourceScan) { pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); } else { // use original table uid as groupId for vtable @@ -3213,7 +3213,7 @@ int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STime } // currently only the tbname pseudo column - if (pInfo->numOfPseudoExpr > 0) { + if (pInfo->numOfPseudoExpr > 0 && !isVtableSourceScan) { code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. @@ -4242,7 +4242,7 @@ FETCH_NEXT_BLOCK: goto NEXT_SUBMIT_BLK; } else if (pInfo->blockType == STREAM_INPUT__CHECKPOINT) { - if (pInfo->validBlockIndex >= total) { + if (pInfo->validBlockIndex >= total || pVtableInfos != NULL) { doClearBufferedBlocks(pInfo); (*ppRes) = NULL; return code; @@ -4529,6 +4529,10 @@ void destroyStreamScanOperatorInfo(void* param) { taosArrayDestroy(pStreamScan->pVtableReadyHandles); pStreamScan->pVtableReadyHandles = NULL; } + if (pStreamScan->pTableListInfo) { + tableListDestroy(pStreamScan->pTableListInfo); + pStreamScan->pTableListInfo = NULL; + } if (pStreamScan->matchInfo.pList) { taosArrayDestroy(pStreamScan->matchInfo.pList); } @@ -4716,15 +4720,13 @@ _end: return code; } -static int32_t createStreamVtableBlock(SColMatchInfo *pMatchInfo, SSDataBlock **ppRes, const char *idstr) { +static SSDataBlock* createStreamVtableBlock(SColMatchInfo *pMatchInfo, const char *idstr) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SSDataBlock *pRes = NULL; QUERY_CHECK_NULL(pMatchInfo, code, lino, _end, TSDB_CODE_INVALID_PARA); - *ppRes = NULL; - code = createDataBlock(&pRes); QUERY_CHECK_CODE(code, lino, _end); int32_t numOfOutput = taosArrayGetSize(pMatchInfo->pList); @@ -4738,18 +4740,16 @@ static int32_t createStreamVtableBlock(SColMatchInfo *pMatchInfo, SSDataBlock ** QUERY_CHECK_CODE(code, lino, _end); } - *ppRes = pRes; - pRes = NULL; - - _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr); + if (pRes != NULL) { + blockDataDestroy(pRes); + } + pRes = NULL; + terrno = code; } - if (pRes != NULL) { - blockDataDestroy(pRes); - } - return code; + return pRes; } static int32_t createStreamNormalScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, @@ -4873,8 +4873,7 @@ static int32_t createStreamNormalScanOperatorInfo(SReadHandle* pHandle, STableSc if (pVtableInfos != NULL) { // save vtable info into tqReader for vtable source scan - SSDataBlock* pResBlock = NULL; - code = createStreamVtableBlock(&pInfo->matchInfo, &pResBlock, idstr); + SSDataBlock* pResBlock = createStreamVtableBlock(&pInfo->matchInfo, idstr); QUERY_CHECK_CODE(code, lino, _error); code = pAPI->tqReaderFn.tqReaderSetVtableInfo(pInfo->tqReader, pHandle->vnode, pAPI, pVtableInfos, &pResBlock, idstr); diff --git a/source/libs/executor/src/streamVtableMerge.c b/source/libs/executor/src/streamVtableMerge.c index fb338476bb..4e0f464132 100644 --- a/source/libs/executor/src/streamVtableMerge.c +++ b/source/libs/executor/src/streamVtableMerge.c @@ -28,6 +28,7 @@ typedef struct SVMBufPageInfo { typedef struct SStreamVtableMergeSource { SDiskbasedBuf* pBuf; // buffer for storing data int32_t* pTotalPages; // total pages of all sources in the buffer + int32_t primaryTsIndex; SSDataBlock* pInputDataBlock; // data block to be written to the buffer int64_t currentExpireTimeMs; // expire time of the input data block @@ -44,7 +45,9 @@ typedef struct SStreamVtableMergeHandle { SDiskbasedBuf* pBuf; int32_t numOfPages; int32_t numPageLimit; + int32_t primaryTsIndex; + int64_t vuid; int32_t nSrcTbls; SHashObj* pSources; SSDataBlock* datablock; // Does not store data, only used to save the schema of input/output data blocks @@ -133,11 +136,8 @@ static int32_t svmSourceAddBlock(SStreamVtableMergeSource* pSource, SSDataBlock* QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INVALID_PARA); pInputDataBlock = pSource->pInputDataBlock; - if (pInputDataBlock == NULL) { - code = createOneDataBlock(pDataBlock, false, &pInputDataBlock); - QUERY_CHECK_CODE(code, lino, _end); - pSource->pInputDataBlock = pInputDataBlock; - } + QUERY_CHECK_CONDITION(taosArrayGetSize(pDataBlock->pDataBlock) >= taosArrayGetSize(pInputDataBlock->pDataBlock), code, + lino, _end, TSDB_CODE_INVALID_PARA); int32_t start = 0; int32_t nrows = blockDataGetNumOfRows(pDataBlock); @@ -241,8 +241,9 @@ static int32_t svmSourceCurrentTs(SStreamVtableMergeSource* pSource, const char* QUERY_CHECK_CONDITION(pSource->rowIndex >= 0 && pSource->rowIndex < blockDataGetNumOfRows(pSource->pOutputDataBlock), code, lino, _end, TSDB_CODE_INVALID_PARA); - tsCol = taosArrayGet(pSource->pOutputDataBlock->pDataBlock, 0); + tsCol = taosArrayGet(pSource->pOutputDataBlock->pDataBlock, pSource->primaryTsIndex); QUERY_CHECK_NULL(tsCol, code, lino, _end, terrno); + QUERY_CHECK_CONDITION(tsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP, code, lino, _end, TSDB_CODE_INVALID_PARA); *pTs = ((int64_t*)tsCol->pData)[pSource->rowIndex]; @@ -355,6 +356,9 @@ static SStreamVtableMergeSource* svmAddSource(SStreamVtableMergeHandle* pHandle, QUERY_CHECK_NULL(pSource, code, lino, _end, terrno); pSource->pBuf = pHandle->pBuf; pSource->pTotalPages = &pHandle->numOfPages; + pSource->primaryTsIndex = pHandle->primaryTsIndex; + code = createOneDataBlock(pHandle->datablock, false, &pSource->pInputDataBlock); + QUERY_CHECK_CODE(code, lino, _end); pSource->pageInfoList = tdListNew(sizeof(SVMBufPageInfo)); QUERY_CHECK_NULL(pSource->pageInfoList, code, lino, _end, terrno); code = createOneDataBlock(pHandle->datablock, false, &pSource->pOutputDataBlock); @@ -613,8 +617,9 @@ _end: return code; } -int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle** ppHandle, int32_t nSrcTbls, int32_t numPageLimit, - SDiskbasedBuf* pBuf, SSDataBlock* pResBlock, const char* idstr) { +int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle** ppHandle, int64_t vuid, int32_t nSrcTbls, + int32_t numPageLimit, int32_t primaryTsIndex, SDiskbasedBuf* pBuf, + SSDataBlock* pResBlock, const char* idstr) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamVtableMergeHandle* pHandle = NULL; @@ -629,6 +634,8 @@ int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle** ppHandle, int32 pHandle->pBuf = pBuf; pHandle->numPageLimit = numPageLimit; + pHandle->primaryTsIndex = primaryTsIndex; + pHandle->vuid = vuid; pHandle->nSrcTbls = nSrcTbls; pHandle->pSources = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); QUERY_CHECK_NULL(pHandle->pSources, code, lino, _end, terrno); @@ -666,3 +673,11 @@ void streamVtableMergeDestroyHandle(void* ptr) { taosMemoryFreeClear(*ppHandle); } + +int64_t streamVtableMergeHandleGetVuid(SStreamVtableMergeHandle* pHandle) { + if (pHandle != NULL) { + return pHandle->vuid; + } else { + return 0; + } +} diff --git a/source/libs/executor/src/virtualtablescanoperator.c b/source/libs/executor/src/virtualtablescanoperator.c index aca013474b..960cff5f17 100644 --- a/source/libs/executor/src/virtualtablescanoperator.c +++ b/source/libs/executor/src/virtualtablescanoperator.c @@ -795,13 +795,13 @@ static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** p int32_t numPagePerTable = getNumOfInMemBufPages(pInfo->pVtableMergeBuf) / nTables; for (int32_t i = 0; i < nTables; ++i) { SVCTableMergeInfo* pTableInfo = taosArrayGet(pVTables, i); - if (pTableInfo == NULL) { + if (pTableInfo == NULL || pTableInfo->numOfSrcTbls == 0) { continue; } QUERY_CHECK_CONDITION(pTableInfo->numOfSrcTbls <= numPagePerTable, code, lino, _end, terrno); SStreamVtableMergeHandle* pMergeHandle = NULL; - code = streamVtableMergeCreateHandle(&pMergeHandle, pTableInfo->numOfSrcTbls, numPagePerTable, - pInfo->pVtableMergeBuf, pInfo->pRes, id); + code = streamVtableMergeCreateHandle(&pMergeHandle, pTableInfo->uid, pTableInfo->numOfSrcTbls, numPagePerTable, + pInfo->primaryTsIndex, pInfo->pVtableMergeBuf, pInfo->pRes, id); QUERY_CHECK_CODE(code, lino, _end); code = taosHashPut(pInfo->pVtableMergeHandles, &pTableInfo->uid, sizeof(pTableInfo->uid), &pMergeHandle, POINTER_BYTES); @@ -824,6 +824,30 @@ static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** p break; } + int32_t inputNCols = taosArrayGetSize(pBlock->pDataBlock); + int32_t resNCols = taosArrayGetSize(pResBlock->pDataBlock); + QUERY_CHECK_CONDITION(inputNCols <= resNCols, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + for (int32_t i = 0; i < inputNCols; ++i) { + SColumnInfoData *p1 = taosArrayGet(pResBlock->pDataBlock, i); + QUERY_CHECK_NULL(p1, code, lino, _end, terrno); + SColumnInfoData *p2 = taosArrayGet(pBlock->pDataBlock, i); + QUERY_CHECK_CODE(code, lino, _end); + QUERY_CHECK_CONDITION(p1->info.type == p2->info.type, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + QUERY_CHECK_CONDITION(p1->info.bytes == p2->info.bytes, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + } + for (int32_t i = inputNCols; i < resNCols; ++i) { + SColumnInfoData *p = taosArrayGet(pResBlock->pDataBlock, i); + QUERY_CHECK_NULL(p, code, lino, _end, terrno); + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; + code = blockDataAppendColInfo(pBlock, &colInfo); + QUERY_CHECK_CODE(code, lino, _end); + SColumnInfoData* pNewCol = taosArrayGet(pBlock->pDataBlock, i); + QUERY_CHECK_NULL(pNewCol, code, lino, _end, terrno); + code = colInfoDataEnsureCapacity(pNewCol, pBlock->info.rows, false); + QUERY_CHECK_CODE(code, lino, _end); + colDataSetNNULL(pNewCol, 0, pBlock->info.rows); + } + if (pBlock->info.type == STREAM_NORMAL) { SStreamVtableMergeHandle** ppHandle = taosHashGet(pInfo->pVtableMergeHandles, &pBlock->info.id.uid, sizeof(int64_t)); @@ -878,9 +902,9 @@ static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** p bool newTuple = true; if (pResBlock->info.rows > 0) { - SColumnInfoData* pResTsCol = taosArrayGet(pResBlock->pDataBlock, 0); + SColumnInfoData* pResTsCol = taosArrayGet(pResBlock->pDataBlock, pInfo->primaryTsIndex); int64_t lastResTs = *(int64_t*)colDataGetNumData(pResTsCol, pResBlock->info.rows - 1); - SColumnInfoData* pMergeTsCol = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pMergeTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); int64_t mergeTs = *(int64_t*)colDataGetNumData(pMergeTsCol, idx); QUERY_CHECK_CONDITION(mergeTs >= lastResTs, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); newTuple = (mergeTs > lastResTs); @@ -913,6 +937,7 @@ static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** p } if (pResBlock->info.rows > 0) { + pResBlock->info.id.uid = streamVtableMergeHandleGetVuid(pHandle); break; } } @@ -923,8 +948,18 @@ static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** p pInfo->numOfExec++; if (pResBlock->info.rows > 0) { - (*ppRes) = pResBlock; - pOperator->resultInfo.totalRows += pResBlock->info.rows; + pResBlock->info.id.groupId = tableListGetTableGroupId(pInfo->pTableListInfo, pResBlock->info.id.uid); + code = blockDataUpdateTsWindow(pResBlock, 0); + QUERY_CHECK_CODE(code, lino, _end); + code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pResBlock, + pResBlock->info.rows, pTaskInfo, NULL); + QUERY_CHECK_CODE(code, lino, _end); + code = doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); + QUERY_CHECK_CODE(code, lino, _end); + if (pResBlock->info.rows > 0) { + (*ppRes) = pResBlock; + pOperator->resultInfo.totalRows += pResBlock->info.rows; + } } _end: @@ -1032,7 +1067,7 @@ int32_t createStreamVtableMergeOperatorInfo(SOperatorInfo* pDownstream, SReadHan pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup); QUERY_CHECK_NULL(pInfo->pCreateTbRes, code, lino, _error, terrno); - // create the pseduo columns info + // create the pseudo columns info if (pVirtualScanNode->scan.pScanPseudoCols != NULL) { code = createExprInfo(pVirtualScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr); QUERY_CHECK_CODE(code, lino, _error); @@ -1045,6 +1080,7 @@ int32_t createStreamVtableMergeOperatorInfo(SOperatorInfo* pDownstream, SReadHan QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); code = blockDataEnsureCapacity(pInfo->pRes, TMAX(pOperator->resultInfo.capacity, 4096)); QUERY_CHECK_CODE(code, lino, _error); + pInfo->pRes->info.type = STREAM_NORMAL; code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); QUERY_CHECK_CODE(code, lino, _error); @@ -1054,6 +1090,8 @@ int32_t createStreamVtableMergeOperatorInfo(SOperatorInfo* pDownstream, SReadHan QUERY_CHECK_CODE(code, lino, _error); pInfo->pVtableReadyHandles = taosArrayInit(0, POINTER_BYTES); QUERY_CHECK_NULL(pInfo->pVtableReadyHandles, code, lino, _error, terrno); + pInfo->pTableListInfo = pTableListInfo; + pTableListInfo = NULL; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; @@ -1128,6 +1166,10 @@ _error: destroyStreamScanOperatorInfo(pInfo); } + if (pTableListInfo != NULL) { + tableListDestroy(pTableListInfo); + } + if (pOperator != NULL) { pOperator->info = NULL; destroyOperator(pOperator); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 800817b857..0a80ece8f8 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -816,6 +816,10 @@ static int32_t physiVirtualTableScanCopy(const SVirtualScanPhysiNode* pSrc, SVir COPY_SCALAR_FIELD(groupSort); COPY_SCALAR_FIELD(scanAllCols); CLONE_NODE_LIST_FIELD(pTargets); + CLONE_NODE_LIST_FIELD(pTags); + CLONE_NODE_FIELD(pSubtable); + COPY_SCALAR_FIELD(igExpired); + COPY_SCALAR_FIELD(igCheckUpdate); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index ddfec7085e..5fa6dbf69d 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2479,6 +2479,10 @@ static const char* jkVirtualTableScanPhysiPlanGroupTags = "GroupTags"; static const char* jkVirtualTableScanPhysiPlanGroupSort = "GroupSort"; static const char* jkVirtualTableScanPhysiPlanscanAllCols= "scanAllCols"; static const char* jkVirtualTableScanPhysiPlanTargets = "Targets"; +static const char* jkVirtualTableScanPhysiPlanTags = "Tags"; +static const char* jkVirtualTableScanPhysiPlanSubtable = "Subtable"; +static const char* jkVirtualTableScanPhysiPlanIgExpired = "IgExpired"; +static const char* jkVirtualTableScanPhysiPlanIgCheckUpdate = "IgCheckUpdate"; static int32_t physiVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) { const SVirtualScanPhysiNode* pNode = (const SVirtualScanPhysiNode*)pObj; @@ -2486,11 +2490,11 @@ static int32_t physiVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) { int32_t code = physiScanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { - code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanTargets, pNode->pGroupTags); + code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanGroupTags, pNode->pGroupTags); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddBoolToObject(pJson, jkVirtualTableScanPhysiPlanTargets, pNode->groupSort); + code = tjsonAddBoolToObject(pJson, jkVirtualTableScanPhysiPlanGroupSort, pNode->groupSort); } if (TSDB_CODE_SUCCESS == code) { @@ -2501,6 +2505,22 @@ static int32_t physiVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) { code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanTargets, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanTags, pNode->pTags); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkVirtualTableScanPhysiPlanSubtable, nodeToJson, pNode->pSubtable); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVirtualTableScanPhysiPlanIgExpired, pNode->igExpired); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVirtualTableScanPhysiPlanIgCheckUpdate, pNode->igCheckUpdate); + } + return code; } @@ -2521,6 +2541,22 @@ static int32_t jsonToPhysiVirtualTableScanNode(const SJson* pJson, void* pObj) { code = jsonToNodeList(pJson, jkVirtualTableScanPhysiPlanTargets, &pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkVirtualTableScanPhysiPlanTags, &pNode->pTags); + } + + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkVirtualTableScanPhysiPlanSubtable, &pNode->pSubtable); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkVirtualTableScanPhysiPlanIgExpired, &pNode->igExpired); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkVirtualTableScanPhysiPlanIgCheckUpdate, &pNode->igCheckUpdate); + } + return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index bdf5befca4..2dbf3025da 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2207,6 +2207,10 @@ enum { PHY_VIRTUAL_TABLE_SCAN_CODE_GROUP_SORT, PHY_VIRTUAL_TABLE_SCAN_CODE_ONLY_TS, PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS, + PHY_VIRTUAL_TABLE_SCAN_CODE_TAGS, + PHY_VIRTUAL_TABLE_SCAN_CODE_SUBTABLE, + PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_EXPIRED, + PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_CHECK_UPDATE, }; static int32_t physiVirtualTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2229,6 +2233,23 @@ static int32_t physiVirtualTableScanNodeToMsg(const void* pObj, STlvEncoder* pEn if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } + + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_TAGS, nodeListToMsg, pNode->pTags); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_SUBTABLE, nodeToMsg, pNode->pSubtable); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_EXPIRED, pNode->igExpired); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_CHECK_UPDATE, pNode->igCheckUpdate); + } + return code; } @@ -2254,6 +2275,18 @@ static int32_t msgToPhysiVirtualTableScanNode(STlvDecoder* pDecoder, void* pObj) case PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; + case PHY_VIRTUAL_TABLE_SCAN_CODE_TAGS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTags); + break; + case PHY_VIRTUAL_TABLE_SCAN_CODE_SUBTABLE: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pSubtable); + break; + case PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_EXPIRED: + code = tlvDecodeI8(pTlv, &pNode->igExpired); + break; + case PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_CHECK_UPDATE: + code = tlvDecodeI8(pTlv, &pNode->igCheckUpdate); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 0002d9ce9c..8bc03cd25e 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1908,6 +1908,8 @@ void nodesDestroyNode(SNode* pNode) { destroyScanPhysiNode((SScanPhysiNode*)pNode); nodesDestroyList(pPhyNode->pGroupTags); nodesDestroyList(pPhyNode->pTargets); + nodesDestroyList(pPhyNode->pTags); + nodesDestroyNode(pPhyNode->pSubtable); break; } case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3f370dd1b1..254ee90561 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -13476,14 +13476,14 @@ static int32_t buildQueryTableColIdList(SSelectStmt *pSelect, SArray** ppRes) { *ppRes = taosArrayInit(pColList->length, sizeof(int16_t)); if (NULL == *ppRes) { code = terrno; - parserError("taosArrayInit %d colId failed, errno:0x%x", *ppRes, code); + parserError("taosArrayInit 0x%p colId failed, errno:0x%x", *ppRes, code); goto _return; } FOREACH(pCol, pColList) { if (NULL == taosArrayPush(*ppRes, &((SColumnNode*)pCol)->colId)) { code = terrno; - parserError("taosArrayPush colId failed, errno:0x%x", *ppRes, code); + parserError("taosArrayPush 0x%p colId failed, errno:0x%x", *ppRes, code); goto _return; } } @@ -13532,7 +13532,7 @@ static int32_t modifyVtableSrcNumBasedOnQuery(SArray* pVSubTables, SNode* pStmt) pTbHash = tSimpleHashInit(colNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); if (NULL == pTbHash) { code = terrno; - parserError("tSimpleHashInit %d failed, errno:0x%x", code); + parserError("tSimpleHashInit failed, colNum:%d, errno:0x%x", colNum, code); PAR_ERR_JRET(code); } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 128730d335..33ac7364f3 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -25,7 +25,7 @@ typedef struct SBlockName { static void doMonitorDispatchData(void* param, void* tmrId); static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet); -static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); +static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq, bool withUid); static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId, int64_t now); static int32_t streamMapAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, @@ -365,7 +365,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD return terrno; } - code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs); + code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs, false); if (code != TSDB_CODE_SUCCESS) { destroyDispatchMsg(pReqs, 1); return code; @@ -391,7 +391,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD if (type == STREAM_DELETE_RESULT || type == STREAM_CHECKPOINT || type == STREAM_TRANS_STATE || type == STREAM_RECALCULATE_START) { for (int32_t j = 0; j < numOfVgroups; j++) { - code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]); + code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j], false); if (code != 0) { destroyDispatchMsg(pReqs, numOfVgroups); return code; @@ -436,7 +436,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) { for (int32_t j = 0; j < numOfTasks; j++) { - code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]); + code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j], false); if (code != 0) { destroyDispatchMsg(pReqs, numOfTasks); return code; @@ -821,7 +821,7 @@ static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, stDebug("s-task:%s dst table hashVal:0x%x assign to vgId:%d range[0x%x, 0x%x]", pTask->id.idStr, hashValue, pVgInfo->vgId, pVgInfo->hashBegin, pVgInfo->hashEnd); - if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j])) < 0) { + if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j], false)) < 0) { stError("s-task:%s failed to add dispatch block, code:%s", pTask->id.idStr, tstrerror(terrno)); return code; } @@ -936,7 +936,7 @@ int32_t streamMapAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDa STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, *pIdx); QUERY_CHECK_NULL(pAddr, code, lino, _end, terrno); - code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[*pIdx]); + code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[*pIdx], true); QUERY_CHECK_CODE(code, lino, _end); if (pReqs[*pIdx].blockNum == 0) { @@ -1423,7 +1423,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { +int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq, bool withUid) { size_t dataEncodeSize = blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN; void* buf = taosMemoryCalloc(1, dataStrLen); @@ -1432,7 +1432,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch } SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; - pRetrieve->useconds = htobe64(pBlock->info.id.uid); + pRetrieve->useconds = withUid ? htobe64(pBlock->info.id.uid) : 0; pRetrieve->precision = TSDB_DEFAULT_PRECISION; pRetrieve->compressed = 0; pRetrieve->completed = 1; diff --git a/tests/army/stream/test_stream_vtable.py b/tests/army/stream/test_stream_vtable.py new file mode 100644 index 0000000000..c0c9d4bd1a --- /dev/null +++ b/tests/army/stream/test_stream_vtable.py @@ -0,0 +1,280 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from frame import etool +from frame.etool import * +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame.common import * +import time + +class TDTestCase(TBase): + + def create_tables(self): + tdLog.info("create tables") + + tdSql.execute("drop database if exists test_stream_vtable;") + tdSql.execute("create database test_stream_vtable vgroups 8;") + tdSql.execute("use test_stream_vtable;") + + tdLog.info(f"create org super table.") + tdSql.execute("select database();") + tdSql.execute(f"CREATE STABLE `vtb_org_stb` (" + "ts timestamp, " + "u_tinyint_col tinyint unsigned, " + "u_smallint_col smallint unsigned, " + "u_int_col int unsigned, " + "u_bigint_col bigint unsigned, " + "tinyint_col tinyint, " + "smallint_col smallint, " + "int_col int, " + "bigint_col bigint, " + "float_col float, " + "double_col double, " + "bool_col bool, " + "binary_16_col binary(16)," + "binary_32_col binary(32)," + "nchar_16_col nchar(16)," + "nchar_32_col nchar(32)" + ") TAGS (" + "int_tag int," + "bool_tag bool," + "float_tag float," + "double_tag double)") + + tdLog.info(f"create org child table.") + for i in range(3): + tdSql.execute(f"CREATE TABLE `vtb_org_child_{i}` USING `vtb_org_stb` TAGS ({i}, false, {i}, {i});") + + tdLog.info(f"create virtual normal table.") + + tdSql.execute(f"CREATE VTABLE `vtb_virtual_ntb_full` (" + "ts timestamp, " + "u_tinyint_col tinyint unsigned from vtb_org_child_0.u_tinyint_col, " + "u_smallint_col smallint unsigned from vtb_org_child_1.u_smallint_col, " + "u_int_col int unsigned from vtb_org_child_2.u_int_col, " + "u_bigint_col bigint unsigned from vtb_org_child_0.u_bigint_col, " + "tinyint_col tinyint from vtb_org_child_1.tinyint_col, " + "smallint_col smallint from vtb_org_child_2.smallint_col, " + "int_col int from vtb_org_child_0.int_col, " + "bigint_col bigint from vtb_org_child_1.bigint_col, " + "float_col float from vtb_org_child_2.float_col, " + "double_col double from vtb_org_child_0.double_col, " + "bool_col bool from vtb_org_child_1.bool_col, " + "binary_16_col binary(16) from vtb_org_child_2.binary_16_col," + "binary_32_col binary(32) from vtb_org_child_0.binary_32_col," + "nchar_16_col nchar(16) from vtb_org_child_1.nchar_16_col," + "nchar_32_col nchar(32) from vtb_org_child_2.nchar_32_col)") + + tdSql.execute(f"CREATE VTABLE `vtb_virtual_ntb_half_full` (" + "ts timestamp, " + "u_tinyint_col tinyint unsigned from vtb_org_child_0.u_tinyint_col, " + "u_smallint_col smallint unsigned from vtb_org_child_1.u_smallint_col, " + "u_int_col int unsigned from vtb_org_child_2.u_int_col, " + "u_bigint_col bigint unsigned, " + "tinyint_col tinyint, " + "smallint_col smallint, " + "int_col int from vtb_org_child_0.int_col, " + "bigint_col bigint from vtb_org_child_1.bigint_col, " + "float_col float from vtb_org_child_2.float_col, " + "double_col double, " + "bool_col bool, " + "binary_16_col binary(16)," + "binary_32_col binary(32) from vtb_org_child_0.binary_32_col," + "nchar_16_col nchar(16) from vtb_org_child_1.nchar_16_col," + "nchar_32_col nchar(32) from vtb_org_child_2.nchar_32_col)") + + tdSql.execute(f"CREATE STABLE `vtb_virtual_stb` (" + "ts timestamp, " + "u_tinyint_col tinyint unsigned, " + "u_smallint_col smallint unsigned, " + "u_int_col int unsigned, " + "u_bigint_col bigint unsigned, " + "tinyint_col tinyint, " + "smallint_col smallint, " + "int_col int, " + "bigint_col bigint, " + "float_col float, " + "double_col double, " + "bool_col bool, " + "binary_16_col binary(16)," + "binary_32_col binary(32)," + "nchar_16_col nchar(16)," + "nchar_32_col nchar(32)" + ") TAGS (" + "int_tag int," + "bool_tag bool," + "float_tag float," + "double_tag double)" + "VIRTUAL 1") + + tdLog.info(f"create virtual child table.") + + tdSql.execute(f"CREATE VTABLE `vtb_virtual_ctb_full` (" + "u_tinyint_col from vtb_org_child_0.u_tinyint_col, " + "u_smallint_col from vtb_org_child_1.u_smallint_col, " + "u_int_col from vtb_org_child_2.u_int_col, " + "u_bigint_col from vtb_org_child_0.u_bigint_col, " + "tinyint_col from vtb_org_child_1.tinyint_col, " + "smallint_col from vtb_org_child_2.smallint_col, " + "int_col from vtb_org_child_0.int_col, " + "bigint_col from vtb_org_child_1.bigint_col, " + "float_col from vtb_org_child_2.float_col, " + "double_col from vtb_org_child_0.double_col, " + "bool_col from vtb_org_child_1.bool_col, " + "binary_16_col from vtb_org_child_2.binary_16_col," + "binary_32_col from vtb_org_child_0.binary_32_col," + "nchar_16_col from vtb_org_child_1.nchar_16_col," + "nchar_32_col from vtb_org_child_2.nchar_32_col)" + "USING `vtb_virtual_stb` TAGS (0, false, 0, 0)") + + tdSql.execute(f"CREATE VTABLE `vtb_virtual_ctb_half_full` (" + "u_tinyint_col from vtb_org_child_0.u_tinyint_col, " + "u_smallint_col from vtb_org_child_1.u_smallint_col, " + "u_int_col from vtb_org_child_2.u_int_col, " + "int_col from vtb_org_child_0.int_col, " + "bigint_col from vtb_org_child_1.bigint_col, " + "float_col from vtb_org_child_2.float_col, " + "binary_32_col from vtb_org_child_0.binary_32_col," + "nchar_16_col from vtb_org_child_1.nchar_16_col," + "nchar_32_col from vtb_org_child_2.nchar_32_col)" + "USING `vtb_virtual_stb` TAGS (1, false, 1, 1)") + + tdSql.execute(f"CREATE VTABLE `vtb_virtual_ctb_empty` " + "USING `vtb_virtual_stb` TAGS (2, false, 2, 2)") + + def create_proj_streams(self): + tdSql.execute(f"CREATE STREAM s_proj_1 TRIGGER AT_ONCE INTO dst_proj_1 AS " + "select * from test_stream_vtable.vtb_virtual_ntb_full;") + tdSql.execute(f"CREATE STREAM s_proj_2 TRIGGER AT_ONCE INTO dst_proj_2 AS " + "select * from test_stream_vtable.vtb_virtual_ntb_half_full;") + tdSql.execute(f"CREATE STREAM s_proj_3 TRIGGER AT_ONCE INTO dst_proj_3 AS " + "select * from test_stream_vtable.vtb_virtual_stb PARTITION BY tbname;") + tdSql.execute(f"CREATE STREAM s_proj_4 TRIGGER AT_ONCE INTO dst_proj_4 AS " + "select * from test_stream_vtable.vtb_virtual_ctb_full;") + tdSql.execute(f"CREATE STREAM s_proj_5 TRIGGER AT_ONCE INTO dst_proj_5 AS " + "select * from test_stream_vtable.vtb_virtual_ctb_half_full;") + + tdSql.execute(f"CREATE STREAM s_proj_6 TRIGGER AT_ONCE INTO dst_proj_6 AS " + "select * from test_stream_vtable.vtb_virtual_ntb_full WHERE u_tinyint_col = 1;") + tdSql.execute(f"CREATE STREAM s_proj_7 TRIGGER AT_ONCE INTO dst_proj_7 AS " + "select * from test_stream_vtable.vtb_virtual_ntb_half_full WHERE bool_col = true;") + tdSql.execute(f"CREATE STREAM s_proj_8 TRIGGER AT_ONCE INTO dst_proj_8 AS " + "select * from test_stream_vtable.vtb_virtual_stb WHERE bool_col = true PARTITION BY tbname;") + tdSql.execute(f"CREATE STREAM s_proj_9 TRIGGER AT_ONCE INTO dst_proj_9 AS " + "select * from test_stream_vtable.vtb_virtual_ctb_full WHERE u_tinyint_col = 1;") + tdSql.execute(f"CREATE STREAM s_proj_10 TRIGGER AT_ONCE INTO dst_proj_10 AS " + "select * from test_stream_vtable.vtb_virtual_ctb_half_full WHERE bool_col = true;") + + tdSql.execute(f"CREATE STREAM s_proj_11 TRIGGER AT_ONCE INTO dst_proj_11 AS " + "select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ntb_full;") + tdSql.execute(f"CREATE STREAM s_proj_12 TRIGGER AT_ONCE INTO dst_proj_12 AS " + "select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ntb_half_full;") + tdSql.execute(f"CREATE STREAM s_proj_13 TRIGGER AT_ONCE INTO dst_proj_13 AS " + "select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_stb PARTITION BY tbname;") + tdSql.execute(f"CREATE STREAM s_proj_14 TRIGGER AT_ONCE INTO dst_proj_14 AS " + "select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ctb_full;") + tdSql.execute(f"CREATE STREAM s_proj_15 TRIGGER AT_ONCE INTO dst_proj_15 AS " + "select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ctb_half_full;") + + def create_window_streams(self): + tdSql.execute(f"CREATE STREAM s_interval_1 INTO dst_interval_1 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full interval(1s);") + tdSql.execute(f"CREATE STREAM s_interval_2 INTO dst_interval_2 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full interval(1s) sliding(100a);") + tdSql.execute(f"CREATE STREAM s_interval_3 INTO dst_interval_3 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname interval(1s) sliding(200a);") + tdSql.execute(f"CREATE STREAM s_interval_4 INTO dst_interval_4 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full interval(1s) sliding(100a);") + tdSql.execute(f"CREATE STREAM s_interval_5 INTO dst_interval_5 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full interval(1s);") + + tdSql.execute(f"CREATE STREAM s_state_1 INTO dst_state_1 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full state_window(bool_col);") + tdSql.execute(f"CREATE STREAM s_state_2 INTO dst_state_2 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full state_window(bool_col);") + tdSql.execute(f"CREATE STREAM s_state_3 INTO dst_state_3 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname state_window(bool_col);") + tdSql.execute(f"CREATE STREAM s_state_4 INTO dst_state_4 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full state_window(bool_col);") + tdSql.execute(f"CREATE STREAM s_state_5 INTO dst_state_5 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full state_window(bool_col);") + + tdSql.execute(f"CREATE STREAM s_session_1 INTO dst_session_1 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full session(ts, 10a);") + tdSql.execute(f"CREATE STREAM s_session_2 INTO dst_session_2 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full session(ts, 10a);") + tdSql.execute(f"CREATE STREAM s_session_3 INTO dst_session_3 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname session(ts, 10a);") + tdSql.execute(f"CREATE STREAM s_session_4 INTO dst_session_4 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full session(ts, 10a);") + tdSql.execute(f"CREATE STREAM s_session_5 INTO dst_session_5 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full session(ts, 10a);") + + tdSql.execute(f"CREATE STREAM s_event_1 INTO dst_event_1 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;") + tdSql.execute(f"CREATE STREAM s_event_2 INTO dst_event_2 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;") + tdSql.execute(f"CREATE STREAM s_event_3 INTO dst_event_3 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;") + tdSql.execute(f"CREATE STREAM s_event_4 INTO dst_event_4 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;") + tdSql.execute(f"CREATE STREAM s_event_5 INTO dst_event_5 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;") + + # tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS " + # "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full count_window(20);") + # tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS " + # "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full count_window(20);") + # tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS " + # "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname count_window(20);") + # tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS " + # "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full count_window(20);") + # tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS " + # "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full count_window(20);") + + def wait_streams_ready(self): + for i in range(60): + tdLog.info(f"i={i} wait for stream tasks ready ...") + time.sleep(1) + rows = tdSql.query("select * from information_schema.ins_stream_tasks where status <> 'ready';") + if rows == 0: + break + + def wait_streams_done(self): + # The entire test runs for a while. Wait briefly, and if no exceptions occur, it's sufficient. + for i in range(30): + tdLog.info(f"i={i} wait for stream tasks done ...") + time.sleep(1) + rows = tdSql.query("select * from information_schema.ins_stream_tasks where status <> 'ready';") + if rows != 0: + raise Exception("stream task status is wrong, please check it!") + + + def run(self): + tdLog.debug(f"start to excute {__file__}") + + self.create_tables() + self.create_proj_streams() + self.wait_streams_ready() + json = etool.curFile(__file__, "vtable_insert.json") + etool.benchMark(json=json) + self.wait_streams_done() + + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/stream/vtable_insert.json b/tests/army/stream/vtable_insert.json new file mode 100644 index 0000000000..cacdc0b288 --- /dev/null +++ b/tests/army/stream/vtable_insert.json @@ -0,0 +1,76 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 3, + "create_table_thread_count": 4, + "result_file": "./insert_res.txt", + "confirm_parameter_prompt": "no", + "num_of_records_per_req": 10000, + "prepared_rand": 10000, + "chinese": "no", + "escape_character": "yes", + "continue_if_fail": "no", + "databases": [ + { + "dbinfo": { + "name": "test_stream_vtable", + "drop": "no", + "vgroups": 8, + "precision": "ms" + }, + "super_tables": [ + { + "name": "vtb_org_stb", + "child_table_exists": "yes", + "childtable_count": 3, + "childtable_prefix": "vtb_org_child_", + "auto_create_table": "no", + "batch_create_tbl_num": 5, + "data_source": "rand", + "insert_mode": "taosc", + "non_stop_mode": "no", + "line_protocol": "line", + "insert_rows": 10000, + "childtable_limit": 0, + "childtable_offset": 0, + "interlace_rows": 0, + "insert_interval": 10, + "partial_col_num": 0, + "timestamp_step": 500, + "start_timestamp": "2025-01-01 00:00:00.000", + "sample_format": "csv", + "sample_file": "./sample.csv", + "use_sample_ts": "no", + "tags_file": "", + "columns": [ + {"type": "UTINYINT", "name": "u_tinyint_col"}, + {"type": "USMALLINT", "name": "u_smallint_col"}, + {"type": "UINT", "name": "u_int_col"}, + {"type": "UBIGINT", "name": "u_bigint_col"}, + {"type": "TINYINT", "name": "tinyint_col"}, + {"type": "SMALLINT", "name": "smallint_col"}, + {"type": "INT", "name": "int_col"}, + {"type": "BIGINT", "name": "bigint_col"}, + {"type": "FLOAT", "name": "float_col"}, + {"type": "DOUBLE", "name": "double_col"}, + {"type": "BOOL", "name": "bool_col"}, + {"type": "BINARY", "name": "binary_16_col", "len": 16}, + {"type": "BINARY", "name": "binary_32_col", "len": 32}, + {"type": "NCHAR", "name": "nchar_16_col", "len": 16}, + {"type": "NCHAR", "name": "nchar_32_col", "len": 32} + ], + "tags": [ + {"type": "INT", "name": "int_tag"}, + {"type": "BOOL", "name": "bool_tag"}, + {"type": "FLOAT", "name": "float_tag"}, + {"type": "DOUBLE", "name": "double_tag"} + ] + } + ] + } + ] +} diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 8569a3cb6d..bc7dac10ba 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -90,6 +90,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py ,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py -N 3 -M 3 +,,y,army,./pytest.sh python3 ./test.py -f stream/test_stream_vtable.py # # army/tools