fix(stream): add test cases for vtable stream processing

This commit is contained in:
Jinqing Kuang 2025-03-20 15:56:35 +08:00
parent 2c8bc8b097
commit 4d97d691cf
15 changed files with 543 additions and 52 deletions

View File

@ -549,6 +549,7 @@ typedef struct SStreamScanInfo {
SHashObj* pVtableMergeHandles; // key: vtable uid, value: SStreamVtableMergeHandle
SDiskbasedBuf* pVtableMergeBuf; // page buffer used by vtable merge
SArray* pVtableReadyHandles;
STableListInfo* pTableListInfo;
uint64_t groupId;
bool igCheckGroupId;

View File

@ -30,11 +30,14 @@ typedef enum {
SVM_NEXT_FOUND = 1,
} SVM_NEXT_RESULT;
int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle **ppHandle, int32_t nSrcTbls, int32_t numPageLimit,
SDiskbasedBuf *pBuf, SSDataBlock *pResBlock, const char *idstr);
int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle **ppHandle, int64_t vuid, int32_t nSrcTbls,
int32_t numPageLimit, int32_t primaryTsIndex, SDiskbasedBuf *pBuf,
SSDataBlock *pResBlock, const char *idstr);
void streamVtableMergeDestroyHandle(void *ppHandle);
int64_t streamVtableMergeHandleGetVuid(SStreamVtableMergeHandle *pHandle);
int32_t streamVtableMergeAddBlock(SStreamVtableMergeHandle *pHandle, SSDataBlock *pDataBlock, const char *idstr);
int32_t streamVtableMergeMoveNext(SStreamVtableMergeHandle *pHandle, SVM_NEXT_RESULT *pRes, const char *idstr);

View File

@ -707,7 +707,6 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
}
code = createStreamVtableMergeOperatorInfo(ops[0], pHandle, pVirtualTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOptr);
tableListDestroy(pTableListInfo);
} else {
code = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = code;

View File

@ -3157,7 +3157,7 @@ int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STime
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
const char* id = GET_TASKID(pTaskInfo);
SSHashObj* pVtableInfos = pTaskInfo->pSubplan->pVTables;
bool isVtableSourceScan = (pTaskInfo->pSubplan->pVTables != NULL);
code = blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
QUERY_CHECK_CODE(code, lino, _end);
@ -3168,7 +3168,7 @@ int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STime
pBlockInfo->version = pBlock->info.version;
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
if (pVtableInfos == NULL) {
if (!isVtableSourceScan) {
pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
} else {
// use original table uid as groupId for vtable
@ -3213,7 +3213,7 @@ int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STime
}
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
if (pInfo->numOfPseudoExpr > 0 && !isVtableSourceScan) {
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
@ -4242,7 +4242,7 @@ FETCH_NEXT_BLOCK:
goto NEXT_SUBMIT_BLK;
} else if (pInfo->blockType == STREAM_INPUT__CHECKPOINT) {
if (pInfo->validBlockIndex >= total) {
if (pInfo->validBlockIndex >= total || pVtableInfos != NULL) {
doClearBufferedBlocks(pInfo);
(*ppRes) = NULL;
return code;
@ -4529,6 +4529,10 @@ void destroyStreamScanOperatorInfo(void* param) {
taosArrayDestroy(pStreamScan->pVtableReadyHandles);
pStreamScan->pVtableReadyHandles = NULL;
}
if (pStreamScan->pTableListInfo) {
tableListDestroy(pStreamScan->pTableListInfo);
pStreamScan->pTableListInfo = NULL;
}
if (pStreamScan->matchInfo.pList) {
taosArrayDestroy(pStreamScan->matchInfo.pList);
}
@ -4716,15 +4720,13 @@ _end:
return code;
}
static int32_t createStreamVtableBlock(SColMatchInfo *pMatchInfo, SSDataBlock **ppRes, const char *idstr) {
static SSDataBlock* createStreamVtableBlock(SColMatchInfo *pMatchInfo, const char *idstr) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSDataBlock *pRes = NULL;
QUERY_CHECK_NULL(pMatchInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
*ppRes = NULL;
code = createDataBlock(&pRes);
QUERY_CHECK_CODE(code, lino, _end);
int32_t numOfOutput = taosArrayGetSize(pMatchInfo->pList);
@ -4738,18 +4740,16 @@ static int32_t createStreamVtableBlock(SColMatchInfo *pMatchInfo, SSDataBlock **
QUERY_CHECK_CODE(code, lino, _end);
}
*ppRes = pRes;
pRes = NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
}
if (pRes != NULL) {
blockDataDestroy(pRes);
}
return code;
pRes = NULL;
terrno = code;
}
return pRes;
}
static int32_t createStreamNormalScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
@ -4873,8 +4873,7 @@ static int32_t createStreamNormalScanOperatorInfo(SReadHandle* pHandle, STableSc
if (pVtableInfos != NULL) {
// save vtable info into tqReader for vtable source scan
SSDataBlock* pResBlock = NULL;
code = createStreamVtableBlock(&pInfo->matchInfo, &pResBlock, idstr);
SSDataBlock* pResBlock = createStreamVtableBlock(&pInfo->matchInfo, idstr);
QUERY_CHECK_CODE(code, lino, _error);
code = pAPI->tqReaderFn.tqReaderSetVtableInfo(pInfo->tqReader, pHandle->vnode, pAPI, pVtableInfos, &pResBlock,
idstr);

View File

@ -28,6 +28,7 @@ typedef struct SVMBufPageInfo {
typedef struct SStreamVtableMergeSource {
SDiskbasedBuf* pBuf; // buffer for storing data
int32_t* pTotalPages; // total pages of all sources in the buffer
int32_t primaryTsIndex;
SSDataBlock* pInputDataBlock; // data block to be written to the buffer
int64_t currentExpireTimeMs; // expire time of the input data block
@ -44,7 +45,9 @@ typedef struct SStreamVtableMergeHandle {
SDiskbasedBuf* pBuf;
int32_t numOfPages;
int32_t numPageLimit;
int32_t primaryTsIndex;
int64_t vuid;
int32_t nSrcTbls;
SHashObj* pSources;
SSDataBlock* datablock; // Does not store data, only used to save the schema of input/output data blocks
@ -133,11 +136,8 @@ static int32_t svmSourceAddBlock(SStreamVtableMergeSource* pSource, SSDataBlock*
QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INVALID_PARA);
pInputDataBlock = pSource->pInputDataBlock;
if (pInputDataBlock == NULL) {
code = createOneDataBlock(pDataBlock, false, &pInputDataBlock);
QUERY_CHECK_CODE(code, lino, _end);
pSource->pInputDataBlock = pInputDataBlock;
}
QUERY_CHECK_CONDITION(taosArrayGetSize(pDataBlock->pDataBlock) >= taosArrayGetSize(pInputDataBlock->pDataBlock), code,
lino, _end, TSDB_CODE_INVALID_PARA);
int32_t start = 0;
int32_t nrows = blockDataGetNumOfRows(pDataBlock);
@ -241,8 +241,9 @@ static int32_t svmSourceCurrentTs(SStreamVtableMergeSource* pSource, const char*
QUERY_CHECK_CONDITION(pSource->rowIndex >= 0 && pSource->rowIndex < blockDataGetNumOfRows(pSource->pOutputDataBlock),
code, lino, _end, TSDB_CODE_INVALID_PARA);
tsCol = taosArrayGet(pSource->pOutputDataBlock->pDataBlock, 0);
tsCol = taosArrayGet(pSource->pOutputDataBlock->pDataBlock, pSource->primaryTsIndex);
QUERY_CHECK_NULL(tsCol, code, lino, _end, terrno);
QUERY_CHECK_CONDITION(tsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP, code, lino, _end, TSDB_CODE_INVALID_PARA);
*pTs = ((int64_t*)tsCol->pData)[pSource->rowIndex];
@ -355,6 +356,9 @@ static SStreamVtableMergeSource* svmAddSource(SStreamVtableMergeHandle* pHandle,
QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
pSource->pBuf = pHandle->pBuf;
pSource->pTotalPages = &pHandle->numOfPages;
pSource->primaryTsIndex = pHandle->primaryTsIndex;
code = createOneDataBlock(pHandle->datablock, false, &pSource->pInputDataBlock);
QUERY_CHECK_CODE(code, lino, _end);
pSource->pageInfoList = tdListNew(sizeof(SVMBufPageInfo));
QUERY_CHECK_NULL(pSource->pageInfoList, code, lino, _end, terrno);
code = createOneDataBlock(pHandle->datablock, false, &pSource->pOutputDataBlock);
@ -613,8 +617,9 @@ _end:
return code;
}
int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle** ppHandle, int32_t nSrcTbls, int32_t numPageLimit,
SDiskbasedBuf* pBuf, SSDataBlock* pResBlock, const char* idstr) {
int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle** ppHandle, int64_t vuid, int32_t nSrcTbls,
int32_t numPageLimit, int32_t primaryTsIndex, SDiskbasedBuf* pBuf,
SSDataBlock* pResBlock, const char* idstr) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamVtableMergeHandle* pHandle = NULL;
@ -629,6 +634,8 @@ int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle** ppHandle, int32
pHandle->pBuf = pBuf;
pHandle->numPageLimit = numPageLimit;
pHandle->primaryTsIndex = primaryTsIndex;
pHandle->vuid = vuid;
pHandle->nSrcTbls = nSrcTbls;
pHandle->pSources = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
QUERY_CHECK_NULL(pHandle->pSources, code, lino, _end, terrno);
@ -666,3 +673,11 @@ void streamVtableMergeDestroyHandle(void* ptr) {
taosMemoryFreeClear(*ppHandle);
}
int64_t streamVtableMergeHandleGetVuid(SStreamVtableMergeHandle* pHandle) {
if (pHandle != NULL) {
return pHandle->vuid;
} else {
return 0;
}
}

View File

@ -795,13 +795,13 @@ static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** p
int32_t numPagePerTable = getNumOfInMemBufPages(pInfo->pVtableMergeBuf) / nTables;
for (int32_t i = 0; i < nTables; ++i) {
SVCTableMergeInfo* pTableInfo = taosArrayGet(pVTables, i);
if (pTableInfo == NULL) {
if (pTableInfo == NULL || pTableInfo->numOfSrcTbls == 0) {
continue;
}
QUERY_CHECK_CONDITION(pTableInfo->numOfSrcTbls <= numPagePerTable, code, lino, _end, terrno);
SStreamVtableMergeHandle* pMergeHandle = NULL;
code = streamVtableMergeCreateHandle(&pMergeHandle, pTableInfo->numOfSrcTbls, numPagePerTable,
pInfo->pVtableMergeBuf, pInfo->pRes, id);
code = streamVtableMergeCreateHandle(&pMergeHandle, pTableInfo->uid, pTableInfo->numOfSrcTbls, numPagePerTable,
pInfo->primaryTsIndex, pInfo->pVtableMergeBuf, pInfo->pRes, id);
QUERY_CHECK_CODE(code, lino, _end);
code = taosHashPut(pInfo->pVtableMergeHandles, &pTableInfo->uid, sizeof(pTableInfo->uid), &pMergeHandle,
POINTER_BYTES);
@ -824,6 +824,30 @@ static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** p
break;
}
int32_t inputNCols = taosArrayGetSize(pBlock->pDataBlock);
int32_t resNCols = taosArrayGetSize(pResBlock->pDataBlock);
QUERY_CHECK_CONDITION(inputNCols <= resNCols, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
for (int32_t i = 0; i < inputNCols; ++i) {
SColumnInfoData *p1 = taosArrayGet(pResBlock->pDataBlock, i);
QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
SColumnInfoData *p2 = taosArrayGet(pBlock->pDataBlock, i);
QUERY_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CONDITION(p1->info.type == p2->info.type, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
QUERY_CHECK_CONDITION(p1->info.bytes == p2->info.bytes, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
}
for (int32_t i = inputNCols; i < resNCols; ++i) {
SColumnInfoData *p = taosArrayGet(pResBlock->pDataBlock, i);
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
code = blockDataAppendColInfo(pBlock, &colInfo);
QUERY_CHECK_CODE(code, lino, _end);
SColumnInfoData* pNewCol = taosArrayGet(pBlock->pDataBlock, i);
QUERY_CHECK_NULL(pNewCol, code, lino, _end, terrno);
code = colInfoDataEnsureCapacity(pNewCol, pBlock->info.rows, false);
QUERY_CHECK_CODE(code, lino, _end);
colDataSetNNULL(pNewCol, 0, pBlock->info.rows);
}
if (pBlock->info.type == STREAM_NORMAL) {
SStreamVtableMergeHandle** ppHandle =
taosHashGet(pInfo->pVtableMergeHandles, &pBlock->info.id.uid, sizeof(int64_t));
@ -878,9 +902,9 @@ static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** p
bool newTuple = true;
if (pResBlock->info.rows > 0) {
SColumnInfoData* pResTsCol = taosArrayGet(pResBlock->pDataBlock, 0);
SColumnInfoData* pResTsCol = taosArrayGet(pResBlock->pDataBlock, pInfo->primaryTsIndex);
int64_t lastResTs = *(int64_t*)colDataGetNumData(pResTsCol, pResBlock->info.rows - 1);
SColumnInfoData* pMergeTsCol = taosArrayGet(pBlock->pDataBlock, 0);
SColumnInfoData* pMergeTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
int64_t mergeTs = *(int64_t*)colDataGetNumData(pMergeTsCol, idx);
QUERY_CHECK_CONDITION(mergeTs >= lastResTs, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
newTuple = (mergeTs > lastResTs);
@ -913,6 +937,7 @@ static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** p
}
if (pResBlock->info.rows > 0) {
pResBlock->info.id.uid = streamVtableMergeHandleGetVuid(pHandle);
break;
}
}
@ -922,10 +947,20 @@ static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** p
}
pInfo->numOfExec++;
if (pResBlock->info.rows > 0) {
pResBlock->info.id.groupId = tableListGetTableGroupId(pInfo->pTableListInfo, pResBlock->info.id.uid);
code = blockDataUpdateTsWindow(pResBlock, 0);
QUERY_CHECK_CODE(code, lino, _end);
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pResBlock,
pResBlock->info.rows, pTaskInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
code = doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
if (pResBlock->info.rows > 0) {
(*ppRes) = pResBlock;
pOperator->resultInfo.totalRows += pResBlock->info.rows;
}
}
_end:
if (pIter != NULL) {
@ -1032,7 +1067,7 @@ int32_t createStreamVtableMergeOperatorInfo(SOperatorInfo* pDownstream, SReadHan
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
QUERY_CHECK_NULL(pInfo->pCreateTbRes, code, lino, _error, terrno);
// create the pseduo columns info
// create the pseudo columns info
if (pVirtualScanNode->scan.pScanPseudoCols != NULL) {
code = createExprInfo(pVirtualScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr);
QUERY_CHECK_CODE(code, lino, _error);
@ -1045,6 +1080,7 @@ int32_t createStreamVtableMergeOperatorInfo(SOperatorInfo* pDownstream, SReadHan
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
code = blockDataEnsureCapacity(pInfo->pRes, TMAX(pOperator->resultInfo.capacity, 4096));
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pRes->info.type = STREAM_NORMAL;
code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes);
QUERY_CHECK_CODE(code, lino, _error);
@ -1054,6 +1090,8 @@ int32_t createStreamVtableMergeOperatorInfo(SOperatorInfo* pDownstream, SReadHan
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pVtableReadyHandles = taosArrayInit(0, POINTER_BYTES);
QUERY_CHECK_NULL(pInfo->pVtableReadyHandles, code, lino, _error, terrno);
pInfo->pTableListInfo = pTableListInfo;
pTableListInfo = NULL;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
@ -1128,6 +1166,10 @@ _error:
destroyStreamScanOperatorInfo(pInfo);
}
if (pTableListInfo != NULL) {
tableListDestroy(pTableListInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);

View File

@ -816,6 +816,10 @@ static int32_t physiVirtualTableScanCopy(const SVirtualScanPhysiNode* pSrc, SVir
COPY_SCALAR_FIELD(groupSort);
COPY_SCALAR_FIELD(scanAllCols);
CLONE_NODE_LIST_FIELD(pTargets);
CLONE_NODE_LIST_FIELD(pTags);
CLONE_NODE_FIELD(pSubtable);
COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(igCheckUpdate);
return TSDB_CODE_SUCCESS;
}

View File

@ -2479,6 +2479,10 @@ static const char* jkVirtualTableScanPhysiPlanGroupTags = "GroupTags";
static const char* jkVirtualTableScanPhysiPlanGroupSort = "GroupSort";
static const char* jkVirtualTableScanPhysiPlanscanAllCols= "scanAllCols";
static const char* jkVirtualTableScanPhysiPlanTargets = "Targets";
static const char* jkVirtualTableScanPhysiPlanTags = "Tags";
static const char* jkVirtualTableScanPhysiPlanSubtable = "Subtable";
static const char* jkVirtualTableScanPhysiPlanIgExpired = "IgExpired";
static const char* jkVirtualTableScanPhysiPlanIgCheckUpdate = "IgCheckUpdate";
static int32_t physiVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) {
const SVirtualScanPhysiNode* pNode = (const SVirtualScanPhysiNode*)pObj;
@ -2486,11 +2490,11 @@ static int32_t physiVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) {
int32_t code = physiScanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanTargets, pNode->pGroupTags);
code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanGroupTags, pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkVirtualTableScanPhysiPlanTargets, pNode->groupSort);
code = tjsonAddBoolToObject(pJson, jkVirtualTableScanPhysiPlanGroupSort, pNode->groupSort);
}
if (TSDB_CODE_SUCCESS == code) {
@ -2501,6 +2505,22 @@ static int32_t physiVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) {
code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanTargets, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanTags, pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkVirtualTableScanPhysiPlanSubtable, nodeToJson, pNode->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkVirtualTableScanPhysiPlanIgExpired, pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkVirtualTableScanPhysiPlanIgCheckUpdate, pNode->igCheckUpdate);
}
return code;
}
@ -2521,6 +2541,22 @@ static int32_t jsonToPhysiVirtualTableScanNode(const SJson* pJson, void* pObj) {
code = jsonToNodeList(pJson, jkVirtualTableScanPhysiPlanTargets, &pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkVirtualTableScanPhysiPlanTags, &pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkVirtualTableScanPhysiPlanSubtable, &pNode->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkVirtualTableScanPhysiPlanIgExpired, &pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkVirtualTableScanPhysiPlanIgCheckUpdate, &pNode->igCheckUpdate);
}
return code;
}

View File

@ -2207,6 +2207,10 @@ enum {
PHY_VIRTUAL_TABLE_SCAN_CODE_GROUP_SORT,
PHY_VIRTUAL_TABLE_SCAN_CODE_ONLY_TS,
PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS,
PHY_VIRTUAL_TABLE_SCAN_CODE_TAGS,
PHY_VIRTUAL_TABLE_SCAN_CODE_SUBTABLE,
PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_EXPIRED,
PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_CHECK_UPDATE,
};
static int32_t physiVirtualTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2229,6 +2233,23 @@ static int32_t physiVirtualTableScanNodeToMsg(const void* pObj, STlvEncoder* pEn
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_TAGS, nodeListToMsg, pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_SUBTABLE, nodeToMsg, pNode->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_EXPIRED, pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_CHECK_UPDATE, pNode->igCheckUpdate);
}
return code;
}
@ -2254,6 +2275,18 @@ static int32_t msgToPhysiVirtualTableScanNode(STlvDecoder* pDecoder, void* pObj)
case PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
break;
case PHY_VIRTUAL_TABLE_SCAN_CODE_TAGS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTags);
break;
case PHY_VIRTUAL_TABLE_SCAN_CODE_SUBTABLE:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pSubtable);
break;
case PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_EXPIRED:
code = tlvDecodeI8(pTlv, &pNode->igExpired);
break;
case PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_CHECK_UPDATE:
code = tlvDecodeI8(pTlv, &pNode->igCheckUpdate);
break;
default:
break;
}

View File

@ -1908,6 +1908,8 @@ void nodesDestroyNode(SNode* pNode) {
destroyScanPhysiNode((SScanPhysiNode*)pNode);
nodesDestroyList(pPhyNode->pGroupTags);
nodesDestroyList(pPhyNode->pTargets);
nodesDestroyList(pPhyNode->pTags);
nodesDestroyNode(pPhyNode->pSubtable);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:

View File

@ -13476,14 +13476,14 @@ static int32_t buildQueryTableColIdList(SSelectStmt *pSelect, SArray** ppRes) {
*ppRes = taosArrayInit(pColList->length, sizeof(int16_t));
if (NULL == *ppRes) {
code = terrno;
parserError("taosArrayInit %d colId failed, errno:0x%x", *ppRes, code);
parserError("taosArrayInit 0x%p colId failed, errno:0x%x", *ppRes, code);
goto _return;
}
FOREACH(pCol, pColList) {
if (NULL == taosArrayPush(*ppRes, &((SColumnNode*)pCol)->colId)) {
code = terrno;
parserError("taosArrayPush colId failed, errno:0x%x", *ppRes, code);
parserError("taosArrayPush 0x%p colId failed, errno:0x%x", *ppRes, code);
goto _return;
}
}
@ -13532,7 +13532,7 @@ static int32_t modifyVtableSrcNumBasedOnQuery(SArray* pVSubTables, SNode* pStmt)
pTbHash = tSimpleHashInit(colNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (NULL == pTbHash) {
code = terrno;
parserError("tSimpleHashInit %d failed, errno:0x%x", code);
parserError("tSimpleHashInit failed, colNum:%d, errno:0x%x", colNum, code);
PAR_ERR_JRET(code);
}
}

View File

@ -25,7 +25,7 @@ typedef struct SBlockName {
static void doMonitorDispatchData(void* param, void* tmrId);
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet);
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq, bool withUid);
static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
int64_t groupId, int64_t now);
static int32_t streamMapAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
@ -365,7 +365,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
return terrno;
}
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs);
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs, false);
if (code != TSDB_CODE_SUCCESS) {
destroyDispatchMsg(pReqs, 1);
return code;
@ -391,7 +391,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
if (type == STREAM_DELETE_RESULT || type == STREAM_CHECKPOINT ||
type == STREAM_TRANS_STATE || type == STREAM_RECALCULATE_START) {
for (int32_t j = 0; j < numOfVgroups; j++) {
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]);
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j], false);
if (code != 0) {
destroyDispatchMsg(pReqs, numOfVgroups);
return code;
@ -436,7 +436,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT ||
pDataBlock->info.type == STREAM_TRANS_STATE) {
for (int32_t j = 0; j < numOfTasks; j++) {
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]);
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j], false);
if (code != 0) {
destroyDispatchMsg(pReqs, numOfTasks);
return code;
@ -821,7 +821,7 @@ static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs,
stDebug("s-task:%s dst table hashVal:0x%x assign to vgId:%d range[0x%x, 0x%x]", pTask->id.idStr, hashValue,
pVgInfo->vgId, pVgInfo->hashBegin, pVgInfo->hashEnd);
if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j])) < 0) {
if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j], false)) < 0) {
stError("s-task:%s failed to add dispatch block, code:%s", pTask->id.idStr, tstrerror(terrno));
return code;
}
@ -936,7 +936,7 @@ int32_t streamMapAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDa
STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, *pIdx);
QUERY_CHECK_NULL(pAddr, code, lino, _end, terrno);
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[*pIdx]);
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[*pIdx], true);
QUERY_CHECK_CODE(code, lino, _end);
if (pReqs[*pIdx].blockNum == 0) {
@ -1423,7 +1423,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq, bool withUid) {
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
void* buf = taosMemoryCalloc(1, dataStrLen);
@ -1432,7 +1432,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
}
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = htobe64(pBlock->info.id.uid);
pRetrieve->useconds = withUid ? htobe64(pBlock->info.id.uid) : 0;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;

View File

@ -0,0 +1,280 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from frame import etool
from frame.etool import *
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame.common import *
import time
class TDTestCase(TBase):
def create_tables(self):
tdLog.info("create tables")
tdSql.execute("drop database if exists test_stream_vtable;")
tdSql.execute("create database test_stream_vtable vgroups 8;")
tdSql.execute("use test_stream_vtable;")
tdLog.info(f"create org super table.")
tdSql.execute("select database();")
tdSql.execute(f"CREATE STABLE `vtb_org_stb` ("
"ts timestamp, "
"u_tinyint_col tinyint unsigned, "
"u_smallint_col smallint unsigned, "
"u_int_col int unsigned, "
"u_bigint_col bigint unsigned, "
"tinyint_col tinyint, "
"smallint_col smallint, "
"int_col int, "
"bigint_col bigint, "
"float_col float, "
"double_col double, "
"bool_col bool, "
"binary_16_col binary(16),"
"binary_32_col binary(32),"
"nchar_16_col nchar(16),"
"nchar_32_col nchar(32)"
") TAGS ("
"int_tag int,"
"bool_tag bool,"
"float_tag float,"
"double_tag double)")
tdLog.info(f"create org child table.")
for i in range(3):
tdSql.execute(f"CREATE TABLE `vtb_org_child_{i}` USING `vtb_org_stb` TAGS ({i}, false, {i}, {i});")
tdLog.info(f"create virtual normal table.")
tdSql.execute(f"CREATE VTABLE `vtb_virtual_ntb_full` ("
"ts timestamp, "
"u_tinyint_col tinyint unsigned from vtb_org_child_0.u_tinyint_col, "
"u_smallint_col smallint unsigned from vtb_org_child_1.u_smallint_col, "
"u_int_col int unsigned from vtb_org_child_2.u_int_col, "
"u_bigint_col bigint unsigned from vtb_org_child_0.u_bigint_col, "
"tinyint_col tinyint from vtb_org_child_1.tinyint_col, "
"smallint_col smallint from vtb_org_child_2.smallint_col, "
"int_col int from vtb_org_child_0.int_col, "
"bigint_col bigint from vtb_org_child_1.bigint_col, "
"float_col float from vtb_org_child_2.float_col, "
"double_col double from vtb_org_child_0.double_col, "
"bool_col bool from vtb_org_child_1.bool_col, "
"binary_16_col binary(16) from vtb_org_child_2.binary_16_col,"
"binary_32_col binary(32) from vtb_org_child_0.binary_32_col,"
"nchar_16_col nchar(16) from vtb_org_child_1.nchar_16_col,"
"nchar_32_col nchar(32) from vtb_org_child_2.nchar_32_col)")
tdSql.execute(f"CREATE VTABLE `vtb_virtual_ntb_half_full` ("
"ts timestamp, "
"u_tinyint_col tinyint unsigned from vtb_org_child_0.u_tinyint_col, "
"u_smallint_col smallint unsigned from vtb_org_child_1.u_smallint_col, "
"u_int_col int unsigned from vtb_org_child_2.u_int_col, "
"u_bigint_col bigint unsigned, "
"tinyint_col tinyint, "
"smallint_col smallint, "
"int_col int from vtb_org_child_0.int_col, "
"bigint_col bigint from vtb_org_child_1.bigint_col, "
"float_col float from vtb_org_child_2.float_col, "
"double_col double, "
"bool_col bool, "
"binary_16_col binary(16),"
"binary_32_col binary(32) from vtb_org_child_0.binary_32_col,"
"nchar_16_col nchar(16) from vtb_org_child_1.nchar_16_col,"
"nchar_32_col nchar(32) from vtb_org_child_2.nchar_32_col)")
tdSql.execute(f"CREATE STABLE `vtb_virtual_stb` ("
"ts timestamp, "
"u_tinyint_col tinyint unsigned, "
"u_smallint_col smallint unsigned, "
"u_int_col int unsigned, "
"u_bigint_col bigint unsigned, "
"tinyint_col tinyint, "
"smallint_col smallint, "
"int_col int, "
"bigint_col bigint, "
"float_col float, "
"double_col double, "
"bool_col bool, "
"binary_16_col binary(16),"
"binary_32_col binary(32),"
"nchar_16_col nchar(16),"
"nchar_32_col nchar(32)"
") TAGS ("
"int_tag int,"
"bool_tag bool,"
"float_tag float,"
"double_tag double)"
"VIRTUAL 1")
tdLog.info(f"create virtual child table.")
tdSql.execute(f"CREATE VTABLE `vtb_virtual_ctb_full` ("
"u_tinyint_col from vtb_org_child_0.u_tinyint_col, "
"u_smallint_col from vtb_org_child_1.u_smallint_col, "
"u_int_col from vtb_org_child_2.u_int_col, "
"u_bigint_col from vtb_org_child_0.u_bigint_col, "
"tinyint_col from vtb_org_child_1.tinyint_col, "
"smallint_col from vtb_org_child_2.smallint_col, "
"int_col from vtb_org_child_0.int_col, "
"bigint_col from vtb_org_child_1.bigint_col, "
"float_col from vtb_org_child_2.float_col, "
"double_col from vtb_org_child_0.double_col, "
"bool_col from vtb_org_child_1.bool_col, "
"binary_16_col from vtb_org_child_2.binary_16_col,"
"binary_32_col from vtb_org_child_0.binary_32_col,"
"nchar_16_col from vtb_org_child_1.nchar_16_col,"
"nchar_32_col from vtb_org_child_2.nchar_32_col)"
"USING `vtb_virtual_stb` TAGS (0, false, 0, 0)")
tdSql.execute(f"CREATE VTABLE `vtb_virtual_ctb_half_full` ("
"u_tinyint_col from vtb_org_child_0.u_tinyint_col, "
"u_smallint_col from vtb_org_child_1.u_smallint_col, "
"u_int_col from vtb_org_child_2.u_int_col, "
"int_col from vtb_org_child_0.int_col, "
"bigint_col from vtb_org_child_1.bigint_col, "
"float_col from vtb_org_child_2.float_col, "
"binary_32_col from vtb_org_child_0.binary_32_col,"
"nchar_16_col from vtb_org_child_1.nchar_16_col,"
"nchar_32_col from vtb_org_child_2.nchar_32_col)"
"USING `vtb_virtual_stb` TAGS (1, false, 1, 1)")
tdSql.execute(f"CREATE VTABLE `vtb_virtual_ctb_empty` "
"USING `vtb_virtual_stb` TAGS (2, false, 2, 2)")
def create_proj_streams(self):
tdSql.execute(f"CREATE STREAM s_proj_1 TRIGGER AT_ONCE INTO dst_proj_1 AS "
"select * from test_stream_vtable.vtb_virtual_ntb_full;")
tdSql.execute(f"CREATE STREAM s_proj_2 TRIGGER AT_ONCE INTO dst_proj_2 AS "
"select * from test_stream_vtable.vtb_virtual_ntb_half_full;")
tdSql.execute(f"CREATE STREAM s_proj_3 TRIGGER AT_ONCE INTO dst_proj_3 AS "
"select * from test_stream_vtable.vtb_virtual_stb PARTITION BY tbname;")
tdSql.execute(f"CREATE STREAM s_proj_4 TRIGGER AT_ONCE INTO dst_proj_4 AS "
"select * from test_stream_vtable.vtb_virtual_ctb_full;")
tdSql.execute(f"CREATE STREAM s_proj_5 TRIGGER AT_ONCE INTO dst_proj_5 AS "
"select * from test_stream_vtable.vtb_virtual_ctb_half_full;")
tdSql.execute(f"CREATE STREAM s_proj_6 TRIGGER AT_ONCE INTO dst_proj_6 AS "
"select * from test_stream_vtable.vtb_virtual_ntb_full WHERE u_tinyint_col = 1;")
tdSql.execute(f"CREATE STREAM s_proj_7 TRIGGER AT_ONCE INTO dst_proj_7 AS "
"select * from test_stream_vtable.vtb_virtual_ntb_half_full WHERE bool_col = true;")
tdSql.execute(f"CREATE STREAM s_proj_8 TRIGGER AT_ONCE INTO dst_proj_8 AS "
"select * from test_stream_vtable.vtb_virtual_stb WHERE bool_col = true PARTITION BY tbname;")
tdSql.execute(f"CREATE STREAM s_proj_9 TRIGGER AT_ONCE INTO dst_proj_9 AS "
"select * from test_stream_vtable.vtb_virtual_ctb_full WHERE u_tinyint_col = 1;")
tdSql.execute(f"CREATE STREAM s_proj_10 TRIGGER AT_ONCE INTO dst_proj_10 AS "
"select * from test_stream_vtable.vtb_virtual_ctb_half_full WHERE bool_col = true;")
tdSql.execute(f"CREATE STREAM s_proj_11 TRIGGER AT_ONCE INTO dst_proj_11 AS "
"select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ntb_full;")
tdSql.execute(f"CREATE STREAM s_proj_12 TRIGGER AT_ONCE INTO dst_proj_12 AS "
"select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ntb_half_full;")
tdSql.execute(f"CREATE STREAM s_proj_13 TRIGGER AT_ONCE INTO dst_proj_13 AS "
"select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_stb PARTITION BY tbname;")
tdSql.execute(f"CREATE STREAM s_proj_14 TRIGGER AT_ONCE INTO dst_proj_14 AS "
"select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ctb_full;")
tdSql.execute(f"CREATE STREAM s_proj_15 TRIGGER AT_ONCE INTO dst_proj_15 AS "
"select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ctb_half_full;")
def create_window_streams(self):
tdSql.execute(f"CREATE STREAM s_interval_1 INTO dst_interval_1 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full interval(1s);")
tdSql.execute(f"CREATE STREAM s_interval_2 INTO dst_interval_2 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full interval(1s) sliding(100a);")
tdSql.execute(f"CREATE STREAM s_interval_3 INTO dst_interval_3 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname interval(1s) sliding(200a);")
tdSql.execute(f"CREATE STREAM s_interval_4 INTO dst_interval_4 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full interval(1s) sliding(100a);")
tdSql.execute(f"CREATE STREAM s_interval_5 INTO dst_interval_5 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full interval(1s);")
tdSql.execute(f"CREATE STREAM s_state_1 INTO dst_state_1 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full state_window(bool_col);")
tdSql.execute(f"CREATE STREAM s_state_2 INTO dst_state_2 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full state_window(bool_col);")
tdSql.execute(f"CREATE STREAM s_state_3 INTO dst_state_3 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname state_window(bool_col);")
tdSql.execute(f"CREATE STREAM s_state_4 INTO dst_state_4 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full state_window(bool_col);")
tdSql.execute(f"CREATE STREAM s_state_5 INTO dst_state_5 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full state_window(bool_col);")
tdSql.execute(f"CREATE STREAM s_session_1 INTO dst_session_1 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full session(ts, 10a);")
tdSql.execute(f"CREATE STREAM s_session_2 INTO dst_session_2 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full session(ts, 10a);")
tdSql.execute(f"CREATE STREAM s_session_3 INTO dst_session_3 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname session(ts, 10a);")
tdSql.execute(f"CREATE STREAM s_session_4 INTO dst_session_4 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full session(ts, 10a);")
tdSql.execute(f"CREATE STREAM s_session_5 INTO dst_session_5 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full session(ts, 10a);")
tdSql.execute(f"CREATE STREAM s_event_1 INTO dst_event_1 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;")
tdSql.execute(f"CREATE STREAM s_event_2 INTO dst_event_2 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;")
tdSql.execute(f"CREATE STREAM s_event_3 INTO dst_event_3 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;")
tdSql.execute(f"CREATE STREAM s_event_4 INTO dst_event_4 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;")
tdSql.execute(f"CREATE STREAM s_event_5 INTO dst_event_5 AS "
"select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;")
# tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS "
# "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full count_window(20);")
# tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS "
# "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full count_window(20);")
# tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS "
# "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname count_window(20);")
# tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS "
# "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full count_window(20);")
# tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS "
# "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full count_window(20);")
def wait_streams_ready(self):
for i in range(60):
tdLog.info(f"i={i} wait for stream tasks ready ...")
time.sleep(1)
rows = tdSql.query("select * from information_schema.ins_stream_tasks where status <> 'ready';")
if rows == 0:
break
def wait_streams_done(self):
# The entire test runs for a while. Wait briefly, and if no exceptions occur, it's sufficient.
for i in range(30):
tdLog.info(f"i={i} wait for stream tasks done ...")
time.sleep(1)
rows = tdSql.query("select * from information_schema.ins_stream_tasks where status <> 'ready';")
if rows != 0:
raise Exception("stream task status is wrong, please check it!")
def run(self):
tdLog.debug(f"start to excute {__file__}")
self.create_tables()
self.create_proj_streams()
self.wait_streams_ready()
json = etool.curFile(__file__, "vtable_insert.json")
etool.benchMark(json=json)
self.wait_streams_done()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,76 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 3,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 10000,
"prepared_rand": 10000,
"chinese": "no",
"escape_character": "yes",
"continue_if_fail": "no",
"databases": [
{
"dbinfo": {
"name": "test_stream_vtable",
"drop": "no",
"vgroups": 8,
"precision": "ms"
},
"super_tables": [
{
"name": "vtb_org_stb",
"child_table_exists": "yes",
"childtable_count": 3,
"childtable_prefix": "vtb_org_child_",
"auto_create_table": "no",
"batch_create_tbl_num": 5,
"data_source": "rand",
"insert_mode": "taosc",
"non_stop_mode": "no",
"line_protocol": "line",
"insert_rows": 10000,
"childtable_limit": 0,
"childtable_offset": 0,
"interlace_rows": 0,
"insert_interval": 10,
"partial_col_num": 0,
"timestamp_step": 500,
"start_timestamp": "2025-01-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"use_sample_ts": "no",
"tags_file": "",
"columns": [
{"type": "UTINYINT", "name": "u_tinyint_col"},
{"type": "USMALLINT", "name": "u_smallint_col"},
{"type": "UINT", "name": "u_int_col"},
{"type": "UBIGINT", "name": "u_bigint_col"},
{"type": "TINYINT", "name": "tinyint_col"},
{"type": "SMALLINT", "name": "smallint_col"},
{"type": "INT", "name": "int_col"},
{"type": "BIGINT", "name": "bigint_col"},
{"type": "FLOAT", "name": "float_col"},
{"type": "DOUBLE", "name": "double_col"},
{"type": "BOOL", "name": "bool_col"},
{"type": "BINARY", "name": "binary_16_col", "len": 16},
{"type": "BINARY", "name": "binary_32_col", "len": 32},
{"type": "NCHAR", "name": "nchar_16_col", "len": 16},
{"type": "NCHAR", "name": "nchar_32_col", "len": 32}
],
"tags": [
{"type": "INT", "name": "int_tag"},
{"type": "BOOL", "name": "bool_tag"},
{"type": "FLOAT", "name": "float_tag"},
{"type": "DOUBLE", "name": "double_tag"}
]
}
]
}
]
}

View File

@ -90,6 +90,7 @@
,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py
,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py -N 3 -M 3
,,y,army,./pytest.sh python3 ./test.py -f stream/test_stream_vtable.py
#
# army/tools