diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 4d35c8db7d..45737b9273 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -102,6 +102,8 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper, const char* stbFullName, bool newSubTableRule, STaskNotifyEventStat* pNotifyEventStat); +void qSetStreamMergeInfo(qTaskInfo_t tinfo, SArray* pVTables); + /** * Set multiple input data blocks for the stream scan. * @param tinfo diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 50b4cace02..1f5f808a65 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1073,12 +1073,6 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { if (IS_VAR_DATA_TYPE(pCol->info.type)) { size_t metaSize = pBlock->info.rows * sizeof(int32_t); - char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small - if (tmp == NULL) { - return terrno; - } - - pCol->varmeta.offset = (int32_t*)tmp; memcpy(pCol->varmeta.offset, pStart, metaSize); pStart += metaSize; } else { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 61f4c6f6ec..3cb6521e00 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -375,6 +375,7 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, EStreamTaskT uint64_t uid = 0; SArray** pTaskList = NULL; if (pSourceTaskList) { + uid = pStream->uid; pTaskList = &pSourceTaskList; } else { streamGetUidTaskList(pStream, type, &uid, &pTaskList); @@ -454,6 +455,7 @@ static int32_t addSourceTaskVTableOutput(SStreamTask* pTask, SSHashObj* pVgTasks TSDB_CHECK_NULL(pTaskMap, code, lino, _end, terrno); pTask->outputInfo.type = TASK_OUTPUT__VTABLE_MAP; + pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; STaskDispatcherVtableMap *pDispatcher = &pTask->outputInfo.vtableMapDispatcher; pDispatcher->taskInfos = taosArrayInit(taskNum, sizeof(STaskDispatcherFixed)); TSDB_CHECK_NULL(pDispatcher->taskInfos, code, lino, _end, terrno); @@ -462,26 +464,32 @@ static int32_t addSourceTaskVTableOutput(SStreamTask* pTask, SSHashObj* pVgTasks int32_t iter = 0, vgId = 0; uint64_t uid = 0; - STaskDispatcherFixed* pAddr = NULL; void* p = NULL; while (NULL != (p = tSimpleHashIterate(pVtables, p, &iter))) { char* vgUid = tSimpleHashGetKey(p, NULL); vgId = *(int32_t*)vgUid; uid = *(uint64_t*)((int32_t*)vgUid + 1); - pAddr = tSimpleHashGet(pVgTasks, &vgId, sizeof(vgId)); - if (NULL == pAddr) { + void *px = tSimpleHashGet(pVgTasks, &vgId, sizeof(vgId)); + if (NULL == px) { mError("tSimpleHashGet vgId %d not found", vgId); return code; } + SStreamTask* pMergeTask = *(SStreamTask**)px; + if (pMergeTask == NULL) { + mError("tSimpleHashGet pMergeTask %d not found", vgId); + return code; + } - void* px = tSimpleHashGet(pTaskMap, &pAddr->taskId, sizeof(int32_t)); + px = tSimpleHashGet(pTaskMap, &pMergeTask->id.taskId, sizeof(pMergeTask->id.taskId)); int32_t idx = 0; if (px == NULL) { - px = taosArrayPush(pDispatcher->taskInfos, pAddr); + STaskDispatcherFixed addr = { + .taskId = pMergeTask->id.taskId, .nodeId = pMergeTask->info.nodeId, .epSet = pMergeTask->info.epSet}; + px = taosArrayPush(pDispatcher->taskInfos, &addr); TSDB_CHECK_NULL(px, code, lino, _end, terrno); idx = taosArrayGetSize(pDispatcher->taskInfos) - 1; - code = tSimpleHashPut(pTaskMap, &pAddr->taskId, sizeof(int32_t), &idx, sizeof(int32_t)); + code = tSimpleHashPut(pTaskMap, &pMergeTask->id.taskId, sizeof(pMergeTask->id.taskId), &idx, sizeof(idx)); if (code) { mError("tSimpleHashPut uid to task idx failed, error:%d", code); return code; @@ -495,9 +503,15 @@ static int32_t addSourceTaskVTableOutput(SStreamTask* pTask, SSHashObj* pVgTasks mError("tSimpleHashPut uid to STaskDispatcherFixed failed, error:%d", code); return code; } - - mDebug("source task[%s,vg:%d] add vtable output map, vuid %" PRIu64 " => [%d, vg:%d]", - pTask->id.idStr, pTask->info.nodeId, uid, pAddr->taskId, pAddr->nodeId); + + code = streamTaskSetUpstreamInfo(pMergeTask, pTask); + if (code != TSDB_CODE_SUCCESS) { + mError("failed to set upstream info of merge task, error:%d", code); + return code; + } + + mDebug("source task[%s,vg:%d] add vtable output map, vuid %" PRIu64 " => [%d, vg:%d]", pTask->id.idStr, + pTask->info.nodeId, uid, pMergeTask->id.taskId, pMergeTask->info.nodeId); } _end: @@ -662,7 +676,6 @@ static int32_t addVTableMergeTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pS } static int32_t buildMergeTaskHash(SArray* pMergeTaskList, SSHashObj** ppVgTasks) { - STaskDispatcherFixed addr; int32_t code = 0; int32_t taskNum = taosArrayGetSize(pMergeTaskList); @@ -676,11 +689,7 @@ static int32_t buildMergeTaskHash(SArray* pMergeTaskList, SSHashObj** ppVgTasks) for (int32_t i = 0; i < taskNum; ++i) { SStreamTask* pTask = taosArrayGetP(pMergeTaskList, i); - addr.taskId = pTask->id.taskId; - addr.nodeId = pTask->info.nodeId; - addr.epSet = pTask->info.epSet; - - code = tSimpleHashPut(*ppVgTasks, &addr.nodeId, sizeof(addr.nodeId), &addr, sizeof(addr)); + code = tSimpleHashPut(*ppVgTasks, &pTask->info.nodeId, sizeof(pTask->info.nodeId), &pTask, POINTER_BYTES); if (code) { mError("tSimpleHashPut %d STaskDispatcherFixed failed", i); return code; @@ -725,10 +734,9 @@ static int32_t addVTableSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* p } plan->pVTables = *(SSHashObj**)p; - *(SSHashObj**)p = NULL; - code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam, hasAggTasks, pVgTasks, pSourceTaskList); + plan->pVTables = NULL; if (code != 0) { mError("failed to create stream task, code:%s", tstrerror(code)); @@ -1224,7 +1232,7 @@ static int32_t addVgroupToRes(char* fDBName, int32_t vvgId, uint64_t vuid, SRefC char dbVgId[TSDB_DB_NAME_LEN + 32]; SSHashObj *pTarVg = NULL, *pNewVg = NULL; - TSDB_CHECK_CODE(getTableVgId(pDb, 1, fDBName, &vgId, pCol->refColName), lino, _return); + TSDB_CHECK_CODE(getTableVgId(pDb, 1, fDBName, &vgId, pCol->refTableName), lino, _return); snprintf(dbVgId, sizeof(dbVgId), "%s.%d", pCol->refDbName, vgId); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 8fd386bdb2..8504eef3a4 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1307,6 +1307,8 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo STR_WITH_SIZE_TO_VARSTR(level, "agg", 3); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { STR_WITH_SIZE_TO_VARSTR(level, "sink", 4); + } else if (pTask->info.taskLevel == TASK_LEVEL__MERGE) { + STR_WITH_SIZE_TO_VARSTR(level, "merge", 5); } pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 7588ebb7f6..6620656918 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1200,7 +1200,7 @@ int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const } tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList)); - return tqCollectPhysicalTables(pReader, id); + return TSDB_CODE_SUCCESS; } void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) { @@ -1498,8 +1498,7 @@ static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) { pScanInfo->cacheHit = 0; pVirtualTables = pScanInfo->pVirtualTables; - if (taosHashGetSize(pVirtualTables) == 0 || taosHashGetSize(pReader->tbIdHash) == 0 || - taosArrayGetSize(pReader->pColIdList) == 0) { + if (taosHashGetSize(pVirtualTables) == 0 || taosArrayGetSize(pReader->pColIdList) == 0) { goto _end; } @@ -1507,13 +1506,10 @@ static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) { TSDB_CHECK_NULL(pPhysicalTables, code, lino, _end, terrno); taosHashSetFreeFp(pPhysicalTables, destroySourceScanTables); - pIter = taosHashIterate(pReader->tbIdHash, NULL); + pIter = taosHashIterate(pVirtualTables, NULL); while (pIter != NULL) { int64_t vTbUid = *(int64_t*)taosHashGetKey(pIter, NULL); - - px = taosHashGet(pVirtualTables, &vTbUid, sizeof(int64_t)); - TSDB_CHECK_NULL(px, code, lino, _end, terrno); - SArray* pColInfos = *(SArray**)px; + SArray* pColInfos = *(SArray**)pIter; TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); // Traverse all required columns and collect corresponding physical tables @@ -1548,7 +1544,7 @@ static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) { j++; } } - pIter = taosHashIterate(pReader->tbIdHash, pIter); + pIter = taosHashIterate(pVirtualTables, pIter); } pScanInfo->pPhysicalTables = pPhysicalTables; @@ -1574,9 +1570,8 @@ _end: static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) { if (value) { - SSchemaWrapper** ppSchemaWrapper = value; - tDeleteSchemaWrapper(*ppSchemaWrapper); - *ppSchemaWrapper = NULL; + SSchemaWrapper* pSchemaWrapper = value; + tDeleteSchemaWrapper(pSchemaWrapper); } } @@ -1686,8 +1681,8 @@ int32_t tqRetrieveVTableDataBlock(STqReader* pReader, SSDataBlock** pRes, const SColumnInfoData* pOutCol = taosArrayGet(pBlock->pDataBlock, j); TSDB_CHECK_NULL(pOutCol, code, lino, _end, terrno); if (i >= nColInfos) { - tqInfo("%s has %d column info, but vtable column %d is missing, id: %s", __func__, nColInfos, pOutCol->info.colId, - idstr); + tqTrace("%s has %d column info, but vtable column %d is missing, id: %s", __func__, nColInfos, + pOutCol->info.colId, idstr); colDataSetNNULL(pOutCol, 0, numOfRows); j++; continue; @@ -1699,17 +1694,26 @@ int32_t tqRetrieveVTableDataBlock(STqReader* pReader, SSDataBlock** pRes, const i++; continue; } else if (pCol->vColId > pOutCol->info.colId) { - tqInfo("%s does not find column info for vtable column %d, closest vtable column is %d, id: %s", __func__, - pOutCol->info.colId, pCol->vColId, idstr); + tqTrace("%s does not find column info for vtable column %d, closest vtable column is %d, id: %s", __func__, + pOutCol->info.colId, pCol->vColId, idstr); colDataSetNNULL(pOutCol, 0, numOfRows); j++; continue; } - // copy data from physical table to the result block of virtual table + // skip this column if it is from another physical table if (pCol->pTbUid != pTbUid) { - // skip this column since it is from another physical table - } else if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + tqTrace("skip column %d of virtual table %" PRId64 " since it is from table %" PRId64 + ", current block table %" PRId64 ", id: %s", + pCol->vColId, vTbUid, pCol->pTbUid, pTbUid, idstr); + colDataSetNNULL(pOutCol, 0, numOfRows); + i++; + j++; + continue; + } + + // copy data from physical table to the result block of virtual table + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { // try to find the corresponding column data of physical table SColData* pColData = NULL; for (int32_t k = 0; k < nInputCols; ++k) { @@ -1860,7 +1864,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr); } - return (code == TSDB_CODE_SUCCESS); + return false; } bool tqReaderIsQueriedSourceTable(STqReader* pReader, uint64_t uid) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 6b6dfb3167..92b6300beb 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -77,7 +77,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { .pOtherBackend = NULL, }; - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__MERGE) { handle.vnode = ((STQ*)pMeta->ahandle)->pVnode; handle.initTqReader = 1; } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { @@ -86,7 +86,8 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { initStorageAPI(&handle.api); - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG || + pTask->info.taskLevel == TASK_LEVEL__MERGE) { if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) { handle.pStateBackend = pTask->pRecalState; handle.pOtherBackend = pTask->pState; @@ -113,6 +114,8 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code)); return code; } + + qSetStreamMergeInfo(pTask->exec.pExecutor, pTask->pVTables); } streamSetupScheduleTrigger(pTask); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index b50184774b..1ebe5047c5 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -544,6 +544,12 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType); } break; + case TDMT_VND_VSUBTABLES_META: + // error code has been set into reqMsg, no need to handle it here. + if (TSDB_CODE_SUCCESS != vnodeGetVSubtablesMeta(pVnode, &reqMsg)) { + qWarn("vnodeGetVSubtablesMeta failed, msgType:%d", req->msgType); + } + break; default: qError("invalid req msgType %d", req->msgType); reqMsg.code = TSDB_CODE_INVALID_MSG; @@ -730,7 +736,7 @@ int32_t vnodeGetVSubtablesMeta(SVnode *pVnode, SRpcMsg *pMsg) { qError("tSerializeSVSubTablesRsp failed, error:%d", rspSize); goto _return; } - pRsp = rpcMallocCont(rspSize); + pRsp = taosMemoryCalloc(1, rspSize); if (pRsp == NULL) { code = terrno; qError("rpcMallocCont %d failed, error:%d", rspSize, terrno); @@ -755,9 +761,11 @@ _return: qError("vnd get virtual subtables failed cause of %s", tstrerror(code)); } + *pMsg = rspMsg; + tDestroySVSubTablesRsp(&rsp); - tmsgSendRsp(&rspMsg); + //tmsgSendRsp(&rspMsg); return code; } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index a475cca52c..235cbbc5d5 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -317,7 +317,9 @@ typedef struct SCtgVSubTablesCtx { int32_t vgNum; bool clonedVgroups; SArray* pVgroups; - + + int32_t resCode; + int32_t resDoneNum; SVSubTablesRsp* pResList; int32_t resIdx; } SCtgVSubTablesCtx; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index cf60437508..e80d4611e9 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -3151,13 +3151,20 @@ int32_t ctgHandleGetVSubTablesRsp(SCtgTaskReq* tReq, int32_t reqType, const SDat SCtgTask* pTask = tReq->pTask; int32_t newCode = TSDB_CODE_SUCCESS; SCtgVSubTablesCtx* pCtx = (SCtgVSubTablesCtx*)pTask->taskCtx; + int32_t resIdx = atomic_fetch_add_32(&pCtx->resIdx, 1); - CTG_ERR_JRET(ctgProcessRspMsg(pCtx->pResList + atomic_fetch_add_32(&pCtx->resIdx, 1), reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); - - if (atomic_load_32(&pCtx->resIdx) < pCtx->vgNum) { - CTG_RET(code); + code = ctgProcessRspMsg(pCtx->pResList + resIdx, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target); + if (code) { + pCtx->resCode = code; } + int32_t doneNum = atomic_add_fetch_32(&pCtx->resDoneNum, 1); + if (doneNum < pCtx->vgNum) { + return code; + } + + code = pCtx->resCode; + _return: newCode = ctgHandleTaskEnd(pTask, code); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 3695049403..790c541334 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -633,7 +633,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT if (TDMT_VND_TABLE_CFG == msgType) { SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; pName = ctx->pName; - } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType) { + } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType || + TDMT_VND_VSUBTABLES_META == msgType) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); @@ -714,7 +715,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT if (TDMT_VND_TABLE_CFG == msgType) { SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; pName = ctx->pName; - } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType) { + } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType || + TDMT_VND_VSUBTABLES_META == msgType) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index dc46374729..1ec4853e86 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -547,7 +547,10 @@ typedef struct SStreamScanInfo { uint64_t numOfExec; // execution times STqReader* tqReader; - SHashObj* pVtableMergeHandles; // key: vtable uid, value: SStreamVtableMergeHandle + SHashObj* pVtableMergeHandles; // key: vtable uid, value: SStreamVtableMergeHandle + SDiskbasedBuf* pVtableMergeBuf; // page buffer used by vtable merge + SArray* pVtableReadyHandles; + STableListInfo* pTableListInfo; uint64_t groupId; bool igCheckGroupId; diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index c173085a5f..7788b95bdf 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -188,7 +188,7 @@ int32_t createEventNonblockOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, SReadHandle* readHandle, STableListInfo* pTableListInfo, int32_t numOfDownstream, SVirtualScanPhysiNode * pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo); -int32_t createStreamVtableMergeOperatorInfo(SReadHandle* pHandle, SVirtualScanPhysiNode* pVirtualScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); +int32_t createStreamVtableMergeOperatorInfo(SOperatorInfo* pDownstream, SReadHandle* pHandle, SVirtualScanPhysiNode* pVirtualScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); // clang-format on SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index b6e2d639f6..6d6e9c72b3 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -77,6 +77,7 @@ typedef struct { char* stbFullName; // used to generate dest child table name bool newSubTableRule; // used to generate dest child table name STaskNotifyEventStat* pNotifyEventStat; // used to store notify event statistics + SArray * pVTables; // used to store merge info for merge task, SArray } SStreamTaskInfo; struct SExecTaskInfo { diff --git a/source/libs/executor/inc/streamVtableMerge.h b/source/libs/executor/inc/streamVtableMerge.h index 918ef39c68..3e27ae1ced 100644 --- a/source/libs/executor/inc/streamVtableMerge.h +++ b/source/libs/executor/inc/streamVtableMerge.h @@ -30,15 +30,20 @@ typedef enum { SVM_NEXT_FOUND = 1, } SVM_NEXT_RESULT; -int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle **ppHandle, int32_t nSrcTbls, int32_t numPageLimit, - SDiskbasedBuf *pBuf, SSDataBlock *pResBlock, const char *idstr); +int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle **ppHandle, int64_t vuid, int32_t nSrcTbls, + int32_t numPageLimit, int32_t primaryTsIndex, SDiskbasedBuf *pBuf, + SSDataBlock *pResBlock, const char *idstr); -void streamVtableMergeDestroyHandle(SStreamVtableMergeHandle **ppHandle); +void streamVtableMergeDestroyHandle(void *ppHandle); + +int64_t streamVtableMergeHandleGetVuid(SStreamVtableMergeHandle *pHandle); int32_t streamVtableMergeAddBlock(SStreamVtableMergeHandle *pHandle, SSDataBlock *pDataBlock, const char *idstr); -int32_t streamVtableMergeNextTuple(SStreamVtableMergeHandle *pHandle, SSDataBlock *pResBlock, SVM_NEXT_RESULT *pRes, - const char *idstr); +int32_t streamVtableMergeMoveNext(SStreamVtableMergeHandle *pHandle, SVM_NEXT_RESULT *pRes, const char *idstr); + +int32_t streamVtableMergeCurrent(SStreamVtableMergeHandle *pHandle, SSDataBlock **ppDataBlock, int32_t *pRowIdx, + const char *idstr); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 3182f81e65..2cd95c53c4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -141,8 +141,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu const char* id) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN && - pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); return TSDB_CODE_APP_ERROR; @@ -275,6 +274,15 @@ _end: return code; } +void qSetStreamMergeInfo(qTaskInfo_t tinfo, SArray* pVTables) { + if (tinfo == 0 || pVTables == NULL) { + return; + } + + SStreamTaskInfo* pStreamInfo = &((SExecTaskInfo*)tinfo)->streamInfo; + pStreamInfo->pVTables = pVTables; +} + int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 5677d490dc..7aea03699c 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -382,7 +382,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand return terrno; } - if (pHandle->vnode) { + if (pHandle->vnode && (pTaskInfo->pSubplan->pVTables == NULL)) { code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); if (code) { @@ -515,8 +515,6 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator); } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type && model != OPTR_EXEC_MODEL_STREAM) { code = createVirtualTableMergeOperatorInfo(NULL, pHandle, NULL, 0, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOperator); - } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type && model == OPTR_EXEC_MODEL_STREAM) { - code = createStreamVtableMergeOperatorInfo(pHandle, (SVirtualScanPhysiNode*)pPhyNode, pTagCond, pTaskInfo, &pOperator); } else { code = TSDB_CODE_INVALID_PARA; pTaskInfo->code = code; @@ -689,6 +687,26 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand code = createVirtualTableMergeOperatorInfo(ops, pHandle, pTableListInfo, size, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOptr); + } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type && model == OPTR_EXEC_MODEL_STREAM) { + SVirtualScanPhysiNode* pVirtualTableScanNode = (SVirtualScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); + if (!pTableListInfo) { + pTaskInfo->code = terrno; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } + + code = createScanTableListInfo(&pVirtualTableScanNode->scan, pVirtualTableScanNode->pGroupTags, + pVirtualTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, + pTaskInfo); + if (code) { + pTaskInfo->code = code; + tableListDestroy(pTableListInfo); + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return code; + } + + code = createStreamVtableMergeOperatorInfo(ops[0], pHandle, pVirtualTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOptr); } else { code = TSDB_CODE_INVALID_PARA; pTaskInfo->code = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 07dc3a31c5..11390ba38b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3157,7 +3157,7 @@ int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STime SOperatorInfo* pOperator = pInfo->pStreamScanOp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; const char* id = GET_TASKID(pTaskInfo); - SSHashObj* pVtableInfos = pTaskInfo->pSubplan->pVTables; + bool isVtableSourceScan = (pTaskInfo->pSubplan->pVTables != NULL); code = blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); QUERY_CHECK_CODE(code, lino, _end); @@ -3168,7 +3168,7 @@ int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STime pBlockInfo->version = pBlock->info.version; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - if (pVtableInfos == NULL) { + if (!isVtableSourceScan) { pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); } else { // use original table uid as groupId for vtable @@ -3213,7 +3213,7 @@ int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STime } // currently only the tbname pseudo column - if (pInfo->numOfPseudoExpr > 0) { + if (pInfo->numOfPseudoExpr > 0 && !isVtableSourceScan) { code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. @@ -3762,9 +3762,20 @@ static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStreamScanInfo* pInfo = pOperator->info; SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; + SSHashObj* pVtableInfos = pTaskInfo->pSubplan->pVTables; qDebug("stream scan started, %s", id); + if (pVtableInfos != NULL && pStreamInfo->recoverStep != STREAM_RECOVER_STEP__NONE) { + qError("stream vtable source scan should not have recovery step: %d", pStreamInfo->recoverStep); + pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE; + } + + if (pVtableInfos != NULL && !pInfo->igCheckUpdate) { + qError("stream vtable source scan should have igCheckUpdate"); + pInfo->igCheckUpdate = false; + } + if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE2) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; @@ -3863,6 +3874,10 @@ static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { // TODO: refactor FETCH_NEXT_BLOCK: if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { + if (pVtableInfos != NULL) { + qInfo("stream vtable source scan would ignore all data blocks"); + pInfo->validBlockIndex = total; + } if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); (*ppRes) = NULL; @@ -4013,6 +4028,10 @@ FETCH_NEXT_BLOCK: return code; } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { qDebug("stream scan mode:%d, %s", pInfo->scanMode, id); + if (pVtableInfos != NULL && pInfo->scanMode != STREAM_SCAN_FROM_READERHANDLE) { + qError("stream vtable source scan should not have scan mode: %d", pInfo->scanMode); + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } switch (pInfo->scanMode) { case STREAM_SCAN_FROM_RES: { pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; @@ -4167,6 +4186,11 @@ FETCH_NEXT_BLOCK: continue; } + if (pVtableInfos != NULL && pInfo->pCreateTbRes->info.rows > 0) { + qError("stream vtable source scan should not have create table res"); + blockDataCleanup(pInfo->pCreateTbRes); + } + if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; qDebug("create table res exists, rows:%" PRId64 " return from stream scan, %s", @@ -4178,8 +4202,11 @@ FETCH_NEXT_BLOCK: code = doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); QUERY_CHECK_CODE(code, lino, _end); setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type); - code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); - QUERY_CHECK_CODE(code, lino, _end); + if (pVtableInfos == NULL) { + // filter should be applied in merge task for vtables + code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + QUERY_CHECK_CODE(code, lino, _end); + } code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); QUERY_CHECK_CODE(code, lino, _end); @@ -4215,7 +4242,7 @@ FETCH_NEXT_BLOCK: goto NEXT_SUBMIT_BLK; } else if (pInfo->blockType == STREAM_INPUT__CHECKPOINT) { - if (pInfo->validBlockIndex >= total) { + if (pInfo->validBlockIndex >= total || pVtableInfos != NULL) { doClearBufferedBlocks(pInfo); (*ppRes) = NULL; return code; @@ -4494,6 +4521,18 @@ void destroyStreamScanOperatorInfo(void* param) { taosHashCleanup(pStreamScan->pVtableMergeHandles); pStreamScan->pVtableMergeHandles = NULL; } + if (pStreamScan->pVtableMergeBuf) { + destroyDiskbasedBuf(pStreamScan->pVtableMergeBuf); + pStreamScan->pVtableMergeBuf = NULL; + } + if (pStreamScan->pVtableReadyHandles) { + taosArrayDestroy(pStreamScan->pVtableReadyHandles); + pStreamScan->pVtableReadyHandles = NULL; + } + if (pStreamScan->pTableListInfo) { + tableListDestroy(pStreamScan->pTableListInfo); + pStreamScan->pTableListInfo = NULL; + } if (pStreamScan->matchInfo.pList) { taosArrayDestroy(pStreamScan->matchInfo.pList); } @@ -4681,15 +4720,13 @@ _end: return code; } -static int32_t createStreamVtableBlock(SColMatchInfo *pMatchInfo, SSDataBlock **ppRes, const char *idstr) { +static SSDataBlock* createStreamVtableBlock(SColMatchInfo *pMatchInfo, const char *idstr) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SSDataBlock *pRes = NULL; QUERY_CHECK_NULL(pMatchInfo, code, lino, _end, TSDB_CODE_INVALID_PARA); - *ppRes = NULL; - code = createDataBlock(&pRes); QUERY_CHECK_CODE(code, lino, _end); int32_t numOfOutput = taosArrayGetSize(pMatchInfo->pList); @@ -4703,18 +4740,16 @@ static int32_t createStreamVtableBlock(SColMatchInfo *pMatchInfo, SSDataBlock ** QUERY_CHECK_CODE(code, lino, _end); } - *ppRes = pRes; - pRes = NULL; - - _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr); + if (pRes != NULL) { + blockDataDestroy(pRes); + } + pRes = NULL; + terrno = code; } - if (pRes != NULL) { - blockDataDestroy(pRes); - } - return code; + return pRes; } static int32_t createStreamNormalScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, @@ -4838,8 +4873,7 @@ static int32_t createStreamNormalScanOperatorInfo(SReadHandle* pHandle, STableSc if (pVtableInfos != NULL) { // save vtable info into tqReader for vtable source scan - SSDataBlock* pResBlock = NULL; - code = createStreamVtableBlock(&pInfo->matchInfo, &pResBlock, idstr); + SSDataBlock* pResBlock = createStreamVtableBlock(&pInfo->matchInfo, idstr); QUERY_CHECK_CODE(code, lino, _error); code = pAPI->tqReaderFn.tqReaderSetVtableInfo(pInfo->tqReader, pHandle->vnode, pAPI, pVtableInfos, &pResBlock, idstr); @@ -4962,8 +4996,8 @@ _error: taosArrayDestroy(pColIds); } - if (pInfo != NULL) { - STableScanInfo* p = (STableScanInfo*) pInfo->pTableScanOp->info; + if (pInfo != NULL && pInfo->pTableScanOp != NULL) { + STableScanInfo* p = (STableScanInfo*)pInfo->pTableScanOp->info; if (p != NULL) { p->base.pTableListInfo = NULL; } diff --git a/source/libs/executor/src/streamVtableMerge.c b/source/libs/executor/src/streamVtableMerge.c index 1626e8503c..4e0f464132 100644 --- a/source/libs/executor/src/streamVtableMerge.c +++ b/source/libs/executor/src/streamVtableMerge.c @@ -28,6 +28,7 @@ typedef struct SVMBufPageInfo { typedef struct SStreamVtableMergeSource { SDiskbasedBuf* pBuf; // buffer for storing data int32_t* pTotalPages; // total pages of all sources in the buffer + int32_t primaryTsIndex; SSDataBlock* pInputDataBlock; // data block to be written to the buffer int64_t currentExpireTimeMs; // expire time of the input data block @@ -44,12 +45,15 @@ typedef struct SStreamVtableMergeHandle { SDiskbasedBuf* pBuf; int32_t numOfPages; int32_t numPageLimit; + int32_t primaryTsIndex; + int64_t vuid; int32_t nSrcTbls; SHashObj* pSources; SSDataBlock* datablock; // Does not store data, only used to save the schema of input/output data blocks SMultiwayMergeTreeInfo* pMergeTree; + int32_t numEmptySources; int64_t globalLatestTs; } SStreamVtableMergeHandle; @@ -90,6 +94,9 @@ static int32_t svmSourceFlushInput(SStreamVtableMergeSource* pSource, const char // check data block size pBlock = pSource->pInputDataBlock; + if (blockDataGetNumOfRows(pBlock) == 0) { + goto _end; + } int32_t size = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t); QUERY_CHECK_CONDITION(size <= getBufPageSize(pSource->pBuf), code, lino, _end, TSDB_CODE_INTERNAL_ERROR); @@ -123,37 +130,36 @@ _end: static int32_t svmSourceAddBlock(SStreamVtableMergeSource* pSource, SSDataBlock* pDataBlock, const char* idstr) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - int32_t pageSize = 0; - int32_t holdSize = 0; SSDataBlock* pInputDataBlock = NULL; QUERY_CHECK_NULL(pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INVALID_PARA); pInputDataBlock = pSource->pInputDataBlock; - if (pInputDataBlock == NULL) { - code = createOneDataBlock(pDataBlock, false, &pInputDataBlock); - QUERY_CHECK_CODE(code, lino, _end); - pSource->pInputDataBlock = pInputDataBlock; - } + QUERY_CHECK_CONDITION(taosArrayGetSize(pDataBlock->pDataBlock) >= taosArrayGetSize(pInputDataBlock->pDataBlock), code, + lino, _end, TSDB_CODE_INVALID_PARA); int32_t start = 0; int32_t nrows = blockDataGetNumOfRows(pDataBlock); + int32_t pageSize = + getBufPageSize(pSource->pBuf) - sizeof(int32_t) - taosArrayGetSize(pInputDataBlock->pDataBlock) * sizeof(int32_t); while (start < nrows) { int32_t holdSize = blockDataGetSize(pInputDataBlock); QUERY_CHECK_CONDITION(holdSize < pageSize, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); - int32_t stop = 0; + int32_t stop = start; code = blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize - holdSize); - QUERY_CHECK_CODE(code, lino, _end); if (stop == start - 1) { // If pInputDataBlock cannot hold new rows, ignore the error and write pInputDataBlock to the buffer } else { + QUERY_CHECK_CODE(code, lino, _end); // append new rows to pInputDataBlock if (blockDataGetNumOfRows(pInputDataBlock) == 0) { // set expires time for the first block pSource->currentExpireTimeMs = taosGetTimestampMs() + tsStreamVirtualMergeMaxDelayMs; } int32_t numOfRows = stop - start + 1; + code = blockDataEnsureCapacity(pInputDataBlock, pInputDataBlock->info.rows + numOfRows); + QUERY_CHECK_CODE(code, lino, _end); code = blockDataMergeNRows(pInputDataBlock, pDataBlock, start, numOfRows); QUERY_CHECK_CODE(code, lino, _end); } @@ -176,6 +182,17 @@ _end: static bool svmSourceIsEmpty(SStreamVtableMergeSource* pSource) { return listNEles(pSource->pageInfoList) == 0; } +static int64_t svmSourceGetExpireTime(SStreamVtableMergeSource* pSource) { + SListNode* pn = tdListGetHead(pSource->pageInfoList); + if (pn != NULL) { + SVMBufPageInfo* pageInfo = (SVMBufPageInfo*)pn->data; + if (pageInfo != NULL) { + return pageInfo->expireTimeMs; + } + } + return INT64_MAX; +} + static int32_t svmSourceReadBuf(SStreamVtableMergeSource* pSource, const char* idstr) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -188,6 +205,11 @@ static int32_t svmSourceReadBuf(SStreamVtableMergeSource* pSource, const char* i QUERY_CHECK_NULL(pSource->pOutputDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); blockDataCleanup(pSource->pOutputDataBlock); + int32_t numOfCols = taosArrayGetSize(pSource->pOutputDataBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; i++) { + SColumnInfoData* pCol = taosArrayGet(pSource->pOutputDataBlock->pDataBlock, i); + pCol->hasNull = true; + } pn = tdListGetHead(pSource->pageInfoList); QUERY_CHECK_NULL(pn, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); @@ -215,21 +237,15 @@ static int32_t svmSourceCurrentTs(SStreamVtableMergeSource* pSource, const char* SColumnInfoData* tsCol = NULL; QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INVALID_PARA); - QUERY_CHECK_CONDITION(!svmSourceIsEmpty(pSource), code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pSource->pOutputDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_CONDITION(pSource->rowIndex >= 0 && pSource->rowIndex < blockDataGetNumOfRows(pSource->pOutputDataBlock), + code, lino, _end, TSDB_CODE_INVALID_PARA); - if (blockDataGetNumOfRows(pSource->pOutputDataBlock) == 0) { - code = svmSourceReadBuf(pSource, idstr); - QUERY_CHECK_CODE(code, lino, _end); - } - QUERY_CHECK_CONDITION(pSource->rowIndex < blockDataGetNumOfRows(pSource->pOutputDataBlock), code, lino, _end, - TSDB_CODE_INVALID_PARA); - - tsCol = taosArrayGet(pSource->pOutputDataBlock->pDataBlock, 0); + tsCol = taosArrayGet(pSource->pOutputDataBlock->pDataBlock, pSource->primaryTsIndex); QUERY_CHECK_NULL(tsCol, code, lino, _end, terrno); + QUERY_CHECK_CONDITION(tsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP, code, lino, _end, TSDB_CODE_INVALID_PARA); *pTs = ((int64_t*)tsCol->pData)[pSource->rowIndex]; - pSource->latestTs = TMAX(*pTs, pSource->latestTs); _end: if (code != TSDB_CODE_SUCCESS) { @@ -238,55 +254,54 @@ _end: return code; } -static int32_t svmSourceMoveNext(SStreamVtableMergeSource* pSource, const char* idstr, SVM_NEXT_RESULT* pRes) { +static int32_t svmSourceMoveNext(SStreamVtableMergeSource* pSource, const char* idstr) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SListNode* pn = NULL; void* page = NULL; - int64_t latestTs = 0; QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pSource->pOutputDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); - *pRes = SVM_NEXT_NOT_READY; - latestTs = pSource->latestTs; - while (true) { - if (svmSourceIsEmpty(pSource)) { - pSource->rowIndex = 0; - break; + if (pSource->rowIndex >= 0) { + QUERY_CHECK_CONDITION(pSource->rowIndex < blockDataGetNumOfRows(pSource->pOutputDataBlock), code, lino, _end, + TSDB_CODE_INVALID_PARA); + pSource->rowIndex++; + if (pSource->rowIndex >= blockDataGetNumOfRows(pSource->pOutputDataBlock)) { + // Pop the page from the list and recycle it + pn = tdListPopHead(pSource->pageInfoList); + QUERY_CHECK_NULL(pn, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + QUERY_CHECK_NULL(pn->data, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + SVMBufPageInfo* pageInfo = (SVMBufPageInfo*)pn->data; + page = getBufPage(pSource->pBuf, pageInfo->pageId); + QUERY_CHECK_NULL(page, code, lino, _end, terrno); + code = dBufSetBufPageRecycled(pSource->pBuf, page); + QUERY_CHECK_CODE(code, lino, _end); + (*pSource->pTotalPages)--; + taosMemoryFreeClear(pn); + pSource->rowIndex = -1; + } } - QUERY_CHECK_CONDITION(pSource->rowIndex < blockDataGetNumOfRows(pSource->pOutputDataBlock), code, lino, _end, - TSDB_CODE_INVALID_PARA); - - pSource->rowIndex++; - if (pSource->rowIndex >= blockDataGetNumOfRows(pSource->pOutputDataBlock)) { - // Pop the page from the list and recycle it - pn = tdListPopHead(pSource->pageInfoList); - QUERY_CHECK_NULL(pn, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); - QUERY_CHECK_NULL(pn->data, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); - SVMBufPageInfo* pageInfo = (SVMBufPageInfo*)pn->data; - page = getBufPage(pSource->pBuf, pageInfo->pageId); - QUERY_CHECK_NULL(page, code, lino, _end, terrno); - code = dBufSetBufPageRecycled(pSource->pBuf, page); + if (pSource->rowIndex == -1) { + if (svmSourceIsEmpty(pSource)) { + break; + } + // Read the first page from the list + code = svmSourceReadBuf(pSource, idstr); QUERY_CHECK_CODE(code, lino, _end); - (*pSource->pTotalPages)--; - taosMemoryFreeClear(pn); pSource->rowIndex = 0; } - if (svmSourceIsEmpty(pSource)) { - pSource->rowIndex = 0; - break; - } - - int64_t ts = 0; - code = svmSourceCurrentTs(pSource, idstr, &ts); - QUERY_CHECK_CODE(code, lino, _end); - if (ts > latestTs && ts >= *pSource->pGlobalLatestTs) { - *pRes = SVM_NEXT_FOUND; - break; + // Check the timestamp of the current row + int64_t currentTs = INT64_MIN; + code = svmSourceCurrentTs(pSource, idstr, ¤tTs); + if (currentTs > pSource->latestTs) { + pSource->latestTs = currentTs; + if (currentTs >= *pSource->pGlobalLatestTs) { + break; + } } } @@ -306,6 +321,12 @@ static int32_t svmSourceCompare(const void* pLeft, const void* pRight, void* par SStreamVtableMergeSource* pLeftSource = *(SStreamVtableMergeSource**)taosArrayGet(pValidSources, left); SStreamVtableMergeSource* pRightSource = *(SStreamVtableMergeSource**)taosArrayGet(pValidSources, right); + if (svmSourceIsEmpty(pLeftSource)) { + return 1; + } else if (svmSourceIsEmpty(pRightSource)) { + return -1; + } + int64_t leftTs = 0; code = svmSourceCurrentTs(pLeftSource, "", &leftTs); if (code != TSDB_CODE_SUCCESS) { @@ -335,10 +356,14 @@ static SStreamVtableMergeSource* svmAddSource(SStreamVtableMergeHandle* pHandle, QUERY_CHECK_NULL(pSource, code, lino, _end, terrno); pSource->pBuf = pHandle->pBuf; pSource->pTotalPages = &pHandle->numOfPages; + pSource->primaryTsIndex = pHandle->primaryTsIndex; + code = createOneDataBlock(pHandle->datablock, false, &pSource->pInputDataBlock); + QUERY_CHECK_CODE(code, lino, _end); pSource->pageInfoList = tdListNew(sizeof(SVMBufPageInfo)); QUERY_CHECK_NULL(pSource->pageInfoList, code, lino, _end, terrno); code = createOneDataBlock(pHandle->datablock, false, &pSource->pOutputDataBlock); QUERY_CHECK_CODE(code, lino, _end); + pSource->rowIndex = -1; pSource->latestTs = INT64_MIN; pSource->pGlobalLatestTs = &pHandle->globalLatestTs; code = taosHashPut(pHandle->pSources, &uid, sizeof(uid), &pSource, POINTER_BYTES); @@ -387,14 +412,16 @@ static int32_t svmBuildTree(SStreamVtableMergeHandle* pHandle, SVM_NEXT_RESULT* pIter = taosHashIterate(pHandle->pSources, NULL); while (pIter != NULL) { SStreamVtableMergeSource* pSource = *(SStreamVtableMergeSource**)pIter; - if (svmSourceIsEmpty(pSource)) { - code = svmSourceFlushInput(pSource, idstr); + code = svmSourceFlushInput(pSource, idstr); + QUERY_CHECK_CODE(code, lino, _end); + if (pSource->rowIndex == -1) { + code = svmSourceMoveNext(pSource, idstr); QUERY_CHECK_CODE(code, lino, _end); } if (!svmSourceIsEmpty(pSource)) { px = taosArrayPush(pReadySources, &pSource); QUERY_CHECK_NULL(px, code, lino, _end, terrno); - globalExpireTimeMs = TMIN(globalExpireTimeMs, pSource->currentExpireTimeMs); + globalExpireTimeMs = TMIN(globalExpireTimeMs, svmSourceGetExpireTime(pSource)); } pIter = taosHashIterate(pHandle->pSources, pIter); } @@ -427,6 +454,7 @@ static int32_t svmBuildTree(SStreamVtableMergeHandle* pHandle, SVM_NEXT_RESULT* void* param = NULL; code = tMergeTreeCreate(&pHandle->pMergeTree, taosArrayGetSize(pReadySources), pReadySources, svmSourceCompare); QUERY_CHECK_CODE(code, lino, _end); + pHandle->numEmptySources = 0; pReadySources = NULL; *pRes = SVM_NEXT_FOUND; @@ -453,7 +481,7 @@ int32_t streamVtableMergeAddBlock(SStreamVtableMergeHandle* pHandle, SSDataBlock QUERY_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); - pTbUid = pDataBlock->info.id.uid; + pTbUid = pDataBlock->info.id.groupId; px = taosHashGet(pHandle->pSources, &pTbUid, sizeof(int64_t)); if (px == NULL) { @@ -480,8 +508,31 @@ _end: return code; } -int32_t streamVtableMergeNextTuple(SStreamVtableMergeHandle* pHandle, SSDataBlock* pResBlock, SVM_NEXT_RESULT* pRes, - const char* idstr) { +int32_t streamVtableMergeCurrent(SStreamVtableMergeHandle* pHandle, SSDataBlock** ppDataBlock, int32_t* pRowIdx, + const char* idstr) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + QUERY_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(pHandle->pMergeTree, code, lino, _end, TSDB_CODE_INVALID_PARA); + + int32_t idx = tMergeTreeGetChosenIndex(pHandle->pMergeTree); + SArray* pReadySources = pHandle->pMergeTree->param; + void* px = taosArrayGet(pReadySources, idx); + QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + SStreamVtableMergeSource* pSource = *(SStreamVtableMergeSource**)px; + QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + *ppDataBlock = pSource->pOutputDataBlock; + *pRowIdx = pSource->rowIndex; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr); + } + return code; +} + +int32_t streamVtableMergeMoveNext(SStreamVtableMergeHandle* pHandle, SVM_NEXT_RESULT* pRes, const char* idstr) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; void* px = NULL; @@ -489,61 +540,74 @@ int32_t streamVtableMergeNextTuple(SStreamVtableMergeHandle* pHandle, SSDataBloc SStreamVtableMergeSource* pSource = NULL; QUERY_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA); - QUERY_CHECK_NULL(pResBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pRes, code, lino, _end, TSDB_CODE_INVALID_PARA); *pRes = SVM_NEXT_NOT_READY; if (pHandle->pMergeTree == NULL) { - SVM_NEXT_RESULT buildRes = SVM_NEXT_NOT_READY; - code = svmBuildTree(pHandle, &buildRes, idstr); + code = svmBuildTree(pHandle, pRes, idstr); QUERY_CHECK_CODE(code, lino, _end); - if (buildRes == SVM_NEXT_NOT_READY) { - goto _end; - } + goto _end; } - QUERY_CHECK_NULL(pHandle->pMergeTree, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); int32_t idx = tMergeTreeGetChosenIndex(pHandle->pMergeTree); pReadySources = pHandle->pMergeTree->param; px = taosArrayGet(pReadySources, idx); QUERY_CHECK_NULL(px, code, lino, _end, terrno); pSource = *(SStreamVtableMergeSource**)px; - code = blockCopyOneRow(pSource->pOutputDataBlock, pSource->rowIndex, &pResBlock); QUERY_CHECK_CODE(code, lino, _end); - *pRes = SVM_NEXT_FOUND; pHandle->globalLatestTs = TMAX(pSource->latestTs, pHandle->globalLatestTs); - SVM_NEXT_RESULT nextRes = SVM_NEXT_NOT_READY; - int32_t origNumOfPages = pHandle->numOfPages; - code = svmSourceMoveNext(pSource, idstr, &nextRes); + int32_t origNumOfPages = pHandle->numOfPages; + code = svmSourceMoveNext(pSource, idstr); QUERY_CHECK_CODE(code, lino, _end); - bool needDestroy = false; - if (nextRes == SVM_NEXT_NOT_READY) { - needDestroy = true; - } else if (taosArrayGetSize((SArray*)pHandle->pMergeTree->param) != pHandle->nSrcTbls && - pHandle->numOfPages != origNumOfPages) { - // The original data for this portion is incomplete. Its merge was forcibly triggered by certain conditions, so we - // must recheck if those conditions are still met. - if (tsStreamVirtualMergeWaitMode == STREAM_VIRTUAL_MERGE_MAX_DELAY) { - int64_t globalExpireTimeMs = INT64_MAX; - for (int32_t i = 0; i < taosArrayGetSize(pReadySources); ++i) { - px = taosArrayGet(pReadySources, i); - QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); - pSource = *(SStreamVtableMergeSource**)px; - globalExpireTimeMs = TMIN(globalExpireTimeMs, pSource->currentExpireTimeMs); - } - needDestroy = taosGetTimestampMs() < globalExpireTimeMs; - } else if (tsStreamVirtualMergeWaitMode == STREAM_VIRTUAL_MERGE_MAX_MEMORY) { - needDestroy = pHandle->numOfPages < pHandle->numPageLimit; - } else { - code = TSDB_CODE_INTERNAL_ERROR; - QUERY_CHECK_CODE(code, lino, _end); - } + if (svmSourceIsEmpty(pSource)) { + ++pHandle->numEmptySources; } + bool needDestroy = false; + if (pHandle->numEmptySources == taosArrayGetSize(pReadySources)) { + // all sources are empty + needDestroy = true; + } else { + code = tMergeTreeAdjust(pHandle->pMergeTree, tMergeTreeGetAdjustIndex(pHandle->pMergeTree)); + QUERY_CHECK_CODE(code, lino, _end); + if (pHandle->numEmptySources > 0) { + if (tsStreamVirtualMergeWaitMode == STREAM_VIRTUAL_MERGE_WAIT_FOREVER) { + idx = tMergeTreeGetChosenIndex(pHandle->pMergeTree); + px = taosArrayGet(pReadySources, idx); + QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + pSource = *(SStreamVtableMergeSource**)px; + QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + int64_t currentTs = INT64_MIN; + code = svmSourceCurrentTs(pSource, idstr, ¤tTs); + QUERY_CHECK_CODE(code, lino, _end); + needDestroy = currentTs > pHandle->globalLatestTs; + } else if (pHandle->numOfPages != origNumOfPages) { + // The original data for this portion is incomplete. Its merge was forcibly triggered by certain conditions, so + // we must recheck if those conditions are still met. + if (tsStreamVirtualMergeWaitMode == STREAM_VIRTUAL_MERGE_MAX_DELAY) { + int64_t globalExpireTimeMs = INT64_MAX; + for (int32_t i = 0; i < taosArrayGetSize(pReadySources); ++i) { + px = taosArrayGet(pReadySources, i); + QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + pSource = *(SStreamVtableMergeSource**)px; + globalExpireTimeMs = TMIN(globalExpireTimeMs, svmSourceGetExpireTime(pSource)); + } + needDestroy = taosGetTimestampMs() < globalExpireTimeMs; + } else if (tsStreamVirtualMergeWaitMode == STREAM_VIRTUAL_MERGE_MAX_MEMORY) { + needDestroy = pHandle->numOfPages < pHandle->numPageLimit; + } else { + code = TSDB_CODE_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } + } + } + } if (needDestroy) { svmDestroyTree(&pHandle->pMergeTree); + } else { + *pRes = SVM_NEXT_FOUND; } _end: @@ -553,8 +617,9 @@ _end: return code; } -int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle** ppHandle, int32_t nSrcTbls, int32_t numPageLimit, - SDiskbasedBuf* pBuf, SSDataBlock* pResBlock, const char* idstr) { +int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle** ppHandle, int64_t vuid, int32_t nSrcTbls, + int32_t numPageLimit, int32_t primaryTsIndex, SDiskbasedBuf* pBuf, + SSDataBlock* pResBlock, const char* idstr) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamVtableMergeHandle* pHandle = NULL; @@ -569,6 +634,8 @@ int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle** ppHandle, int32 pHandle->pBuf = pBuf; pHandle->numPageLimit = numPageLimit; + pHandle->primaryTsIndex = primaryTsIndex; + pHandle->vuid = vuid; pHandle->nSrcTbls = nSrcTbls; pHandle->pSources = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); QUERY_CHECK_NULL(pHandle->pSources, code, lino, _end, terrno); @@ -590,7 +657,8 @@ _end: return code; } -void streamVtableMergeDestroyHandle(SStreamVtableMergeHandle** ppHandle) { +void streamVtableMergeDestroyHandle(void* ptr) { + SStreamVtableMergeHandle** ppHandle = ptr; if (ppHandle == NULL || *ppHandle == NULL) { return; } @@ -600,8 +668,16 @@ void streamVtableMergeDestroyHandle(SStreamVtableMergeHandle** ppHandle) { taosHashCleanup(pHandle->pSources); pHandle->pSources = NULL; } - + blockDataDestroy(pHandle->datablock); svmDestroyTree(&pHandle->pMergeTree); taosMemoryFreeClear(*ppHandle); } + +int64_t streamVtableMergeHandleGetVuid(SStreamVtableMergeHandle* pHandle) { + if (pHandle != NULL) { + return pHandle->vuid; + } else { + return 0; + } +} diff --git a/source/libs/executor/src/virtualtablescanoperator.c b/source/libs/executor/src/virtualtablescanoperator.c index 2838d107af..960cff5f17 100644 --- a/source/libs/executor/src/virtualtablescanoperator.c +++ b/source/libs/executor/src/virtualtablescanoperator.c @@ -680,499 +680,305 @@ int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, SReadHa STableListInfo* pTableListInfo, int32_t numOfDownstream, SVirtualScanPhysiNode* pVirtualScanPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - SPhysiNode* pPhyNode = (SPhysiNode*)pVirtualScanPhyNode; - int32_t lino = 0; - int32_t code = TSDB_CODE_SUCCESS; - SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; - SNodeList* pMergeKeys = NULL; + SPhysiNode* pPhyNode = (SPhysiNode*)pVirtualScanPhyNode; + int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; + SNodeList* pMergeKeys = NULL; - QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno); - QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno); + QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno); + QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno); - pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder; - pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder; + pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder; + pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder; - SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo; - pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); - TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno); + SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo; + pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno); - SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno); - pVirtualScanInfo->pInputBlock = pInputBlock; + SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno); + pVirtualScanInfo->pInputBlock = pInputBlock; - if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) { - SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup; - pSup->pExprInfo = NULL; - VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs)); + if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) { + SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup; + pSup->pExprInfo = NULL; + VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs)); - pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, - &pTaskInfo->storageAPI.functionStore); - TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno); - } + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, + &pTaskInfo->storageAPI.functionStore); + TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno); + } - initResultSizeInfo(&pOperator->resultInfo, 1024); - TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return); + initResultSizeInfo(&pOperator->resultInfo, 1024); + TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return); - size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock); - int32_t rowSize = pInfo->binfo.pRes->info.rowSize; + size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock); + int32_t rowSize = pInfo->binfo.pRes->info.rowSize; - if (!pVirtualScanPhyNode->scan.node.dynamicOp) { - VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0)); - pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys); - TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno); - } - pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols); - pVirtualScanInfo->sortBufSize = - pVirtualScanInfo->bufPageSize * (numOfDownstream + 1); // one additional is reserved for merged result. - VTS_ERR_JRET(extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId)); + if (!pVirtualScanPhyNode->scan.node.dynamicOp) { + VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0)); + pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys); + TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno); + } + pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols); + pVirtualScanInfo->sortBufSize = + pVirtualScanInfo->bufPageSize * (numOfDownstream + 1); // one additional is reserved for merged result. + VTS_ERR_JRET( + extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId)); - pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols; + pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols; - VTS_ERR_JRET(filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0)); + VTS_ERR_JRET( + filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0)); - pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); - QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno); - pVirtualScanInfo->base.readHandle = *readHandle; - pVirtualScanInfo->base.pTableListInfo = pTableListInfo; + pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); + QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno); + pVirtualScanInfo->base.readHandle = *readHandle; + pVirtualScanInfo->base.pTableListInfo = pTableListInfo; - setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false, - OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = - createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo, - optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false, + OP_NOT_OPENED, pInfo, pTaskInfo); + pOperator->fpSet = + createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); - if (NULL != pDownstream) { - VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream)); - } + if (NULL != pDownstream) { + VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream)); + } - - nodesDestroyList(pMergeKeys); - *pOptrInfo = pOperator; - return TSDB_CODE_SUCCESS; + nodesDestroyList(pMergeKeys); + *pOptrInfo = pOperator; + return TSDB_CODE_SUCCESS; _return: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - if (pInfo != NULL) { - destroyVirtualTableScanOperatorInfo(pInfo); - } - nodesDestroyList(pMergeKeys); - pTaskInfo->code = code; - destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); - return code; + if (pInfo != NULL) { + destroyVirtualTableScanOperatorInfo(pInfo); + } + nodesDestroyList(pMergeKeys); + pTaskInfo->code = code; + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); + return code; } static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - // NOTE: this operator does never check if current status is done or not - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - const char* id = GET_TASKID(pTaskInfo); - - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + const char* id = GET_TASKID(pTaskInfo); SStreamScanInfo* pInfo = pOperator->info; - SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; + SSDataBlock* pResBlock = pInfo->pRes; + SArray* pVTables = pTaskInfo->streamInfo.pVTables; + void* pIter = NULL; - qDebug("stream scan started, %s", id); + (*ppRes) = NULL; + if (pOperator->status == OP_EXEC_DONE) { + goto _end; + } + + qDebug("===stream=== stream vtable merge next, taskId:%s", id); // TODO(kjq): add fill history recover step - size_t total = taosArrayGetSize(pInfo->pBlockLists); -// TODO: refactor -FETCH_NEXT_BLOCK: - if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { - if (pInfo->validBlockIndex >= total) { - doClearBufferedBlocks(pInfo); - (*ppRes) = NULL; - return code; - } + if (pInfo->pVtableMergeHandles == NULL) { + pInfo->pVtableMergeHandles = taosHashInit(taosArrayGetSize(pVTables), + taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + QUERY_CHECK_NULL(pInfo->pVtableMergeHandles, code, lino, _end, terrno); + taosHashSetFreeFp(pInfo->pVtableMergeHandles, streamVtableMergeDestroyHandle); - int32_t current = pInfo->validBlockIndex++; - qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id); - - SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); - QUERY_CHECK_NULL(pPacked, code, lino, _end, terrno); - - SSDataBlock* pBlock = pPacked->pDataBlock; - if (pBlock->info.parTbName[0]) { - code = - pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName); - QUERY_CHECK_CODE(code, lino, _end); - } - - // TODO move into scan - pBlock->info.calWin.skey = INT64_MIN; - pBlock->info.calWin.ekey = INT64_MAX; - pBlock->info.dataLoad = 1; - if (pInfo->pUpdateInfo) { - pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); - } - - code = blockDataUpdateTsWindow(pBlock, 0); - QUERY_CHECK_CODE(code, lino, _end); - switch (pBlock->info.type) { - case STREAM_NORMAL: - case STREAM_GET_ALL: - printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - setStreamOperatorState(&pInfo->basic, pBlock->info.type); - (*ppRes) = pBlock; - return code; - case STREAM_RETRIEVE: { - pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; - code = copyDataBlock(pInfo->pUpdateRes, pBlock); - QUERY_CHECK_CODE(code, lino, _end); - pInfo->updateResIndex = 0; - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); - pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); - } break; - case STREAM_DELETE_DATA: { - printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo)); - SSDataBlock* pDelBlock = NULL; - if (pInfo->tqReader) { - code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock); - QUERY_CHECK_CODE(code, lino, _end); - - code = filterDelBlockByUid(pDelBlock, pBlock, pInfo->tqReader, &pInfo->readerFn); - QUERY_CHECK_CODE(code, lino, _end); - } else { - pDelBlock = pBlock; - } - - code = setBlockGroupIdByUid(pInfo, pDelBlock); - QUERY_CHECK_CODE(code, lino, _end); - code = rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id); - QUERY_CHECK_CODE(code, lino, _end); - printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered", - GET_TASKID(pTaskInfo)); - if (pDelBlock->info.rows == 0) { - if (pInfo->tqReader) { - blockDataDestroy(pDelBlock); - } - goto FETCH_NEXT_BLOCK; - } - - if (!isStreamWindow(pInfo)) { - code = generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); - QUERY_CHECK_CODE(code, lino, _end); - if (pInfo->partitionSup.needCalc) { - pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - } else { - pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; - } - blockDataDestroy(pDelBlock); - - if (pInfo->pDeleteDataRes->info.rows > 0) { - printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", - GET_TASKID(pTaskInfo)); - setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type); - (*ppRes) = pInfo->pDeleteDataRes; - return code; - } else { - goto FETCH_NEXT_BLOCK; - } - } else { - pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; - pInfo->updateResIndex = 0; - code = generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes, STREAM_DELETE_DATA); - QUERY_CHECK_CODE(code, lino, _end); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); - code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); - QUERY_CHECK_CODE(code, lino, _end); - pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - if (pInfo->tqReader) { - blockDataDestroy(pDelBlock); - } - if (pInfo->pDeleteDataRes->info.rows > 0) { - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", - GET_TASKID(pTaskInfo)); - setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type); - (*ppRes) = pInfo->pDeleteDataRes; - return code; - } else { - goto FETCH_NEXT_BLOCK; - } - } - } break; - case STREAM_GET_RESULT: { - pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; - pInfo->updateResIndex = 0; - pInfo->lastScanRange = pBlock->info.window; - TSKEY endKey = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval); - if (pInfo->useGetResultRange == true) { - endKey = pBlock->info.window.ekey; - } - code = copyGetResultBlock(pInfo->pUpdateRes, pBlock->info.window.skey, endKey); - QUERY_CHECK_CODE(code, lino, _end); - pInfo->pUpdateInfo->maxDataVersion = -1; - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - } break; - case STREAM_DROP_CHILD_TABLE: { - int32_t deleteNum = 0; - code = deletePartName(&pInfo->stateStore, pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock, &deleteNum); - QUERY_CHECK_CODE(code, lino, _end); - if (deleteNum == 0) { - printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "block recv", GET_TASKID(pTaskInfo)); - qDebug("===stream=== ignore block type 18, delete num is 0"); - goto FETCH_NEXT_BLOCK; - } - } break; - case STREAM_CHECKPOINT: { - qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK"); - } break; - default: - break; - } - printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "block recv", GET_TASKID(pTaskInfo)); - setStreamOperatorState(&pInfo->basic, pBlock->info.type); - (*ppRes) = pBlock; - return code; - } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { - qDebug("stream scan mode:%d, %s", pInfo->scanMode, id); - switch (pInfo->scanMode) { - case STREAM_SCAN_FROM_RES: { - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - code = doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes); - QUERY_CHECK_CODE(code, lino, _end); - setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type); - code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); - QUERY_CHECK_CODE(code, lino, _end); - pInfo->pRes->info.dataLoad = 1; - code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); - QUERY_CHECK_CODE(code, lino, _end); - if (pInfo->pRes->info.rows > 0) { - printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - (*ppRes) = pInfo->pRes; - return code; - } - } break; - case STREAM_SCAN_FROM_DELETE_DATA: { - code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_PARTITION_DELETE_DATA); - QUERY_CHECK_CODE(code, lino, _end); - if (pInfo->pUpdateRes->info.rows > 0) { - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); - QUERY_CHECK_CODE(code, lino, _end); - pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - (*ppRes) = pInfo->pDeleteDataRes; - return code; - } - qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, - __LINE__); - blockDataCleanup(pInfo->pUpdateDataRes); - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } break; - case STREAM_SCAN_FROM_UPDATERES: { - code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_CLEAR); - QUERY_CHECK_CODE(code, lino, _end); - if (pInfo->pUpdateRes->info.rows > 0) { - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - (*ppRes) = pInfo->pUpdateRes; - return code; - } - qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, - __LINE__); - blockDataCleanup(pInfo->pUpdateDataRes); - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } break; - case STREAM_SCAN_FROM_DATAREADER_RANGE: - case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: { - if (pInfo->pRangeScanRes != NULL) { - (*ppRes) = pInfo->pRangeScanRes; - pInfo->pRangeScanRes = NULL; - return code; - } - SSDataBlock* pSDB = NULL; - code = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex, &pSDB); - QUERY_CHECK_CODE(code, lino, _end); - if (pSDB) { - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; - if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { - code = checkUpdateData(pInfo, true, pSDB, false); - QUERY_CHECK_CODE(code, lino, _end); - } - printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo)); - code = calBlockTbName(pInfo, pSDB, 0); - QUERY_CHECK_CODE(code, lino, _end); - - if (pInfo->pCreateTbRes->info.rows > 0) { - printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "update", - GET_TASKID(pTaskInfo)); - (*ppRes) = pInfo->pCreateTbRes; - pInfo->pRangeScanRes = pSDB; - return code; - } - - (*ppRes) = pSDB; - return code; - } - blockDataCleanup(pInfo->pUpdateDataRes); - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } break; - default: - break; - } - - if (hasScanRange(pInfo)) { - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - pInfo->updateResIndex = 0; - SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup; - code = copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock); - QUERY_CHECK_CODE(code, lino, _end); - blockDataCleanup(pSup->pScanBlock); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); - pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; - printSpecDataBlock(pInfo->pUpdateRes, getStreamOpName(pOperator->operatorType), "rebuild", GET_TASKID(pTaskInfo)); - (*ppRes) = pInfo->pUpdateRes; - return code; - } - - SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); - - NEXT_SUBMIT_BLK: - while (1) { - if (pInfo->readerFn.tqReaderCurrentBlockConsumed(pInfo->tqReader)) { - if (pInfo->validBlockIndex >= totalBlocks) { - pAPI->stateStore.updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); - doClearBufferedBlocks(pInfo); - - qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); - (*ppRes) = NULL; - return code; - } - - int32_t current = pInfo->validBlockIndex++; - SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); - QUERY_CHECK_NULL(pSubmit, code, lino, _end, terrno); - - qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id); - if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver, - NULL) < 0) { - qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, - totalBlocks, id); - continue; - } - } - - blockDataCleanup(pInfo->pRes); - - while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) { - SSDataBlock* pRes = NULL; - - code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id); - qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id); - - if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) { - qDebug("retrieve data failed, try next block in submit block, %s", id); - continue; - } - - code = setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false); - if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - pInfo->pRes->info.rows = 0; - code = TSDB_CODE_SUCCESS; - } else { - QUERY_CHECK_CODE(code, lino, _end); - } - - if (pInfo->pRes->info.rows == 0) { - continue; - } - - if (pInfo->pCreateTbRes->info.rows > 0) { - pInfo->scanMode = STREAM_SCAN_FROM_RES; - qDebug("create table res exists, rows:%" PRId64 " return from stream scan, %s", - pInfo->pCreateTbRes->info.rows, id); - (*ppRes) = pInfo->pCreateTbRes; - return code; - } - - code = doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); - QUERY_CHECK_CODE(code, lino, _end); - setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type); - code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); - QUERY_CHECK_CODE(code, lino, _end); - - code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); - QUERY_CHECK_CODE(code, lino, _end); - - int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; - qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); - if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) { - break; - } - } - - if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { - break; - } else { + int32_t nTables = taosArrayGetSize(pVTables); + int32_t numPagePerTable = getNumOfInMemBufPages(pInfo->pVtableMergeBuf) / nTables; + for (int32_t i = 0; i < nTables; ++i) { + SVCTableMergeInfo* pTableInfo = taosArrayGet(pVTables, i); + if (pTableInfo == NULL || pTableInfo->numOfSrcTbls == 0) { continue; } + QUERY_CHECK_CONDITION(pTableInfo->numOfSrcTbls <= numPagePerTable, code, lino, _end, terrno); + SStreamVtableMergeHandle* pMergeHandle = NULL; + code = streamVtableMergeCreateHandle(&pMergeHandle, pTableInfo->uid, pTableInfo->numOfSrcTbls, numPagePerTable, + pInfo->primaryTsIndex, pInfo->pVtableMergeBuf, pInfo->pRes, id); + QUERY_CHECK_CODE(code, lino, _end); + code = taosHashPut(pInfo->pVtableMergeHandles, &pTableInfo->uid, sizeof(pTableInfo->uid), &pMergeHandle, + POINTER_BYTES); + if (code != TSDB_CODE_SUCCESS) { + streamVtableMergeDestroyHandle(&pMergeHandle); + } + QUERY_CHECK_CODE(code, lino, _end); + } + } + + while (pOperator->status != OP_RES_TO_RETURN) { + SSDataBlock* pBlock = NULL; + SOperatorInfo* downStream = pOperator->pDownstream[0]; + + code = downStream->fpSet.getNextFn(downStream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + + if (pBlock == NULL) { + pOperator->status = OP_RES_TO_RETURN; + break; } - // record the scan action. - pInfo->numOfExec++; - pOperator->resultInfo.totalRows += pBlockInfo->rows; - - qDebug("stream scan completed, and return source rows:%" PRId64 ", %s", pBlockInfo->rows, id); - if (pBlockInfo->rows > 0) { - printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - (*ppRes) = pInfo->pRes; - return code; + int32_t inputNCols = taosArrayGetSize(pBlock->pDataBlock); + int32_t resNCols = taosArrayGetSize(pResBlock->pDataBlock); + QUERY_CHECK_CONDITION(inputNCols <= resNCols, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + for (int32_t i = 0; i < inputNCols; ++i) { + SColumnInfoData *p1 = taosArrayGet(pResBlock->pDataBlock, i); + QUERY_CHECK_NULL(p1, code, lino, _end, terrno); + SColumnInfoData *p2 = taosArrayGet(pBlock->pDataBlock, i); + QUERY_CHECK_CODE(code, lino, _end); + QUERY_CHECK_CONDITION(p1->info.type == p2->info.type, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + QUERY_CHECK_CONDITION(p1->info.bytes == p2->info.bytes, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + } + for (int32_t i = inputNCols; i < resNCols; ++i) { + SColumnInfoData *p = taosArrayGet(pResBlock->pDataBlock, i); + QUERY_CHECK_NULL(p, code, lino, _end, terrno); + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; + code = blockDataAppendColInfo(pBlock, &colInfo); + QUERY_CHECK_CODE(code, lino, _end); + SColumnInfoData* pNewCol = taosArrayGet(pBlock->pDataBlock, i); + QUERY_CHECK_NULL(pNewCol, code, lino, _end, terrno); + code = colInfoDataEnsureCapacity(pNewCol, pBlock->info.rows, false); + QUERY_CHECK_CODE(code, lino, _end); + colDataSetNNULL(pNewCol, 0, pBlock->info.rows); } - if (pInfo->pUpdateDataRes->info.rows > 0) { - goto FETCH_NEXT_BLOCK; - } + if (pBlock->info.type == STREAM_NORMAL) { + SStreamVtableMergeHandle** ppHandle = + taosHashGet(pInfo->pVtableMergeHandles, &pBlock->info.id.uid, sizeof(int64_t)); + if (ppHandle == NULL) { + // skip table that is not needed + continue; + } - goto NEXT_SUBMIT_BLK; - } else if (pInfo->blockType == STREAM_INPUT__CHECKPOINT) { - if (pInfo->validBlockIndex >= total) { - doClearBufferedBlocks(pInfo); - (*ppRes) = NULL; - return code; - } - - int32_t current = pInfo->validBlockIndex++; - qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id); - - SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current); - QUERY_CHECK_NULL(pData, code, lino, _end, terrno); - SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0); - QUERY_CHECK_NULL(pBlock, code, lino, _end, terrno); - - if (pBlock->info.type == STREAM_CHECKPOINT) { + code = streamVtableMergeAddBlock(*ppHandle, pBlock, id); + QUERY_CHECK_CODE(code, lino, _end); + } else if (pBlock->info.type == STREAM_CHECKPOINT) { // todo(kjq): serialize checkpoint + } else { + qError("unexpected block type %d, id:%s", pBlock->info.type, id); + code = TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } + } + + if (taosArrayGetSize(pInfo->pVtableReadyHandles) == 0) { + void* pIter = taosHashIterate(pInfo->pVtableMergeHandles, NULL); + while (pIter != NULL) { + SStreamVtableMergeHandle* pHandle = *(SStreamVtableMergeHandle**)pIter; + SVM_NEXT_RESULT res = SVM_NEXT_NOT_READY; + code = streamVtableMergeMoveNext(pHandle, &res, id); + QUERY_CHECK_CODE(code, lino, _end); + if (res == SVM_NEXT_FOUND) { + void* px = taosArrayPush(pInfo->pVtableReadyHandles, &pHandle); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + } + pIter = taosHashIterate(pInfo->pVtableMergeHandles, pIter); + } + } + + blockDataCleanup(pResBlock); + while (true) { + void* px = taosArrayGetLast(pInfo->pVtableReadyHandles); + if (px == NULL) { + break; + } + + SStreamVtableMergeHandle* pHandle = *(SStreamVtableMergeHandle**)px; + QUERY_CHECK_NULL(pHandle, code, lino, _end, terrno); + + SVM_NEXT_RESULT res = SVM_NEXT_FOUND; + int32_t nCols = taosArrayGetSize(pResBlock->pDataBlock); + while (res == SVM_NEXT_FOUND) { + SSDataBlock* pBlock = NULL; + int32_t idx = 0; + code = streamVtableMergeCurrent(pHandle, &pBlock, &idx, id); + QUERY_CHECK_CODE(code, lino, _end); + + bool newTuple = true; + if (pResBlock->info.rows > 0) { + SColumnInfoData* pResTsCol = taosArrayGet(pResBlock->pDataBlock, pInfo->primaryTsIndex); + int64_t lastResTs = *(int64_t*)colDataGetNumData(pResTsCol, pResBlock->info.rows - 1); + SColumnInfoData* pMergeTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); + int64_t mergeTs = *(int64_t*)colDataGetNumData(pMergeTsCol, idx); + QUERY_CHECK_CONDITION(mergeTs >= lastResTs, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + newTuple = (mergeTs > lastResTs); + } + if (newTuple) { + if (pResBlock->info.rows >= pResBlock->info.capacity) { + break; + } + pResBlock->info.rows++; + for (int32_t i = 0; i < nCols; ++i) { + SColumnInfoData* pResCol = taosArrayGet(pResBlock->pDataBlock, i); + colDataSetNULL(pResCol, pResBlock->info.rows - 1); + } + } + for (int32_t i = 0; i < nCols; ++i) { + SColumnInfoData* pMergeCol = taosArrayGet(pBlock->pDataBlock, i); + if (!colDataIsNull_s(pMergeCol, idx)) { + SColumnInfoData* pResCol = taosArrayGet(pResBlock->pDataBlock, i); + code = colDataAssignNRows(pResCol, pResBlock->info.rows - 1, pMergeCol, idx, 1); + QUERY_CHECK_CODE(code, lino, _end); + } + } + code = streamVtableMergeMoveNext(pHandle, &res, id); + QUERY_CHECK_CODE(code, lino, _end); + } + + if (res == SVM_NEXT_NOT_READY) { + px = taosArrayPop(pInfo->pVtableReadyHandles); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + } + + if (pResBlock->info.rows > 0) { + pResBlock->info.id.uid = streamVtableMergeHandleGetVuid(pHandle); + break; + } + } + + if (taosArrayGetSize(pInfo->pVtableReadyHandles) == 0) { + pOperator->status = OP_EXEC_DONE; + } + + pInfo->numOfExec++; + if (pResBlock->info.rows > 0) { + pResBlock->info.id.groupId = tableListGetTableGroupId(pInfo->pTableListInfo, pResBlock->info.id.uid); + code = blockDataUpdateTsWindow(pResBlock, 0); + QUERY_CHECK_CODE(code, lino, _end); + code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pResBlock, + pResBlock->info.rows, pTaskInfo, NULL); + QUERY_CHECK_CODE(code, lino, _end); + code = doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); + QUERY_CHECK_CODE(code, lino, _end); + if (pResBlock->info.rows > 0) { + (*ppRes) = pResBlock; + pOperator->resultInfo.totalRows += pResBlock->info.rows; } - // printDataBlock(pInfo->pCheckpointRes, "stream scan ck", GET_TASKID(pTaskInfo)); - (*ppRes) = pInfo->pCheckpointRes; - return code; - } else { - qError("stream scan error, invalid block type %d, %s", pInfo->blockType, id); - code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } _end: + if (pIter != NULL) { + taosHashCancelIterate(pInfo->pVtableMergeHandles, pIter); + pIter = NULL; + } if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } - (*ppRes) = NULL; return code; } -int32_t createStreamVtableMergeOperatorInfo(SReadHandle* pHandle, SVirtualScanPhysiNode* pVirtualScanNode, - SNode* pTagCond, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { +int32_t createStreamVtableMergeOperatorInfo(SOperatorInfo* pDownstream, SReadHandle* pHandle, + SVirtualScanPhysiNode* pVirtualScanNode, SNode* pTagCond, + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -1261,7 +1067,7 @@ int32_t createStreamVtableMergeOperatorInfo(SReadHandle* pHandle, SVirtualScanPh pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup); QUERY_CHECK_NULL(pInfo->pCreateTbRes, code, lino, _error, terrno); - // create the pseduo columns info + // create the pseudo columns info if (pVirtualScanNode->scan.pScanPseudoCols != NULL) { code = createExprInfo(pVirtualScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr); QUERY_CHECK_CODE(code, lino, _error); @@ -1272,9 +1078,21 @@ int32_t createStreamVtableMergeOperatorInfo(SReadHandle* pHandle, SVirtualScanPh pInfo->pRes = createDataBlockFromDescNode(pDescNode); QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); + code = blockDataEnsureCapacity(pInfo->pRes, TMAX(pOperator->resultInfo.capacity, 4096)); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pRes->info.type = STREAM_NORMAL; code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); QUERY_CHECK_CODE(code, lino, _error); + int32_t pageSize = getProperSortPageSize(pInfo->pRes->info.rowSize, taosArrayGetSize(pInfo->pRes->pDataBlock)); + code = createDiskbasedBuf(&pInfo->pVtableMergeBuf, pageSize, tsStreamVirtualMergeMaxMemKb * 1024, + "streamVtableMergeBuf", tsTempDir); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pVtableReadyHandles = taosArrayInit(0, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pVtableReadyHandles, code, lino, _error, terrno); + pInfo->pTableListInfo = pTableListInfo; + pTableListInfo = NULL; + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->groupId = 0; @@ -1337,6 +1155,9 @@ int32_t createStreamVtableMergeOperatorInfo(SReadHandle* pHandle, SVirtualScanPh optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); // TODO(kjq): save and load fill history state + code = appendDownstream(pOperator, &pDownstream, 1); + QUERY_CHECK_CODE(code, lino, _error); + *pOptrInfo = pOperator; return code; @@ -1345,6 +1166,10 @@ _error: destroyStreamScanOperatorInfo(pInfo); } + if (pTableListInfo != NULL) { + tableListDestroy(pTableListInfo); + } + if (pOperator != NULL) { pOperator->info = NULL; destroyOperator(pOperator); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 800817b857..0a80ece8f8 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -816,6 +816,10 @@ static int32_t physiVirtualTableScanCopy(const SVirtualScanPhysiNode* pSrc, SVir COPY_SCALAR_FIELD(groupSort); COPY_SCALAR_FIELD(scanAllCols); CLONE_NODE_LIST_FIELD(pTargets); + CLONE_NODE_LIST_FIELD(pTags); + CLONE_NODE_FIELD(pSubtable); + COPY_SCALAR_FIELD(igExpired); + COPY_SCALAR_FIELD(igCheckUpdate); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 1351eb49c1..5fa6dbf69d 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2479,6 +2479,10 @@ static const char* jkVirtualTableScanPhysiPlanGroupTags = "GroupTags"; static const char* jkVirtualTableScanPhysiPlanGroupSort = "GroupSort"; static const char* jkVirtualTableScanPhysiPlanscanAllCols= "scanAllCols"; static const char* jkVirtualTableScanPhysiPlanTargets = "Targets"; +static const char* jkVirtualTableScanPhysiPlanTags = "Tags"; +static const char* jkVirtualTableScanPhysiPlanSubtable = "Subtable"; +static const char* jkVirtualTableScanPhysiPlanIgExpired = "IgExpired"; +static const char* jkVirtualTableScanPhysiPlanIgCheckUpdate = "IgCheckUpdate"; static int32_t physiVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) { const SVirtualScanPhysiNode* pNode = (const SVirtualScanPhysiNode*)pObj; @@ -2486,11 +2490,11 @@ static int32_t physiVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) { int32_t code = physiScanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { - code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanTargets, pNode->pGroupTags); + code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanGroupTags, pNode->pGroupTags); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddBoolToObject(pJson, jkVirtualTableScanPhysiPlanTargets, pNode->groupSort); + code = tjsonAddBoolToObject(pJson, jkVirtualTableScanPhysiPlanGroupSort, pNode->groupSort); } if (TSDB_CODE_SUCCESS == code) { @@ -2501,13 +2505,29 @@ static int32_t physiVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) { code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanTargets, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanTags, pNode->pTags); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkVirtualTableScanPhysiPlanSubtable, nodeToJson, pNode->pSubtable); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVirtualTableScanPhysiPlanIgExpired, pNode->igExpired); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVirtualTableScanPhysiPlanIgCheckUpdate, pNode->igCheckUpdate); + } + return code; } static int32_t jsonToPhysiVirtualTableScanNode(const SJson* pJson, void* pObj) { SVirtualScanPhysiNode* pNode = (SVirtualScanPhysiNode*)pObj; - int32_t code = jsonToPhysicPlanNode(pJson, pObj); + int32_t code = jsonToPhysiScanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkVirtualTableScanPhysiPlanGroupTags, &pNode->pGroupTags); } @@ -2521,6 +2541,22 @@ static int32_t jsonToPhysiVirtualTableScanNode(const SJson* pJson, void* pObj) { code = jsonToNodeList(pJson, jkVirtualTableScanPhysiPlanTargets, &pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkVirtualTableScanPhysiPlanTags, &pNode->pTags); + } + + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkVirtualTableScanPhysiPlanSubtable, &pNode->pSubtable); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkVirtualTableScanPhysiPlanIgExpired, &pNode->igExpired); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkVirtualTableScanPhysiPlanIgCheckUpdate, &pNode->igCheckUpdate); + } + return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index bdf5befca4..2dbf3025da 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2207,6 +2207,10 @@ enum { PHY_VIRTUAL_TABLE_SCAN_CODE_GROUP_SORT, PHY_VIRTUAL_TABLE_SCAN_CODE_ONLY_TS, PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS, + PHY_VIRTUAL_TABLE_SCAN_CODE_TAGS, + PHY_VIRTUAL_TABLE_SCAN_CODE_SUBTABLE, + PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_EXPIRED, + PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_CHECK_UPDATE, }; static int32_t physiVirtualTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2229,6 +2233,23 @@ static int32_t physiVirtualTableScanNodeToMsg(const void* pObj, STlvEncoder* pEn if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } + + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_TAGS, nodeListToMsg, pNode->pTags); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_SUBTABLE, nodeToMsg, pNode->pSubtable); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_EXPIRED, pNode->igExpired); + } + + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_CHECK_UPDATE, pNode->igCheckUpdate); + } + return code; } @@ -2254,6 +2275,18 @@ static int32_t msgToPhysiVirtualTableScanNode(STlvDecoder* pDecoder, void* pObj) case PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; + case PHY_VIRTUAL_TABLE_SCAN_CODE_TAGS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTags); + break; + case PHY_VIRTUAL_TABLE_SCAN_CODE_SUBTABLE: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pSubtable); + break; + case PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_EXPIRED: + code = tlvDecodeI8(pTlv, &pNode->igExpired); + break; + case PHY_VIRTUAL_TABLE_SCAN_CODE_IGNORE_CHECK_UPDATE: + code = tlvDecodeI8(pTlv, &pNode->igCheckUpdate); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 0002d9ce9c..8bc03cd25e 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1908,6 +1908,8 @@ void nodesDestroyNode(SNode* pNode) { destroyScanPhysiNode((SScanPhysiNode*)pNode); nodesDestroyList(pPhyNode->pGroupTags); nodesDestroyList(pPhyNode->pTargets); + nodesDestroyList(pPhyNode->pTags); + nodesDestroyNode(pPhyNode->pSubtable); break; } case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 955299dd44..f16760c6b1 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -12045,6 +12045,12 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt } if (isVirtualTable(tableType) || (tableType == TSDB_SUPER_TABLE && pMeta->virtualStb)) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + if ((STREAM_TRIGGER_WINDOW_CLOSE != pStmt->pOptions->triggerType) && + !(STREAM_TRIGGER_AT_ONCE == pStmt->pOptions->triggerType && (NULL == pSelect->pWindow && NULL == pSelect->pEvery))) { + taosMemoryFree(pMeta); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Not supported virtual table stream query or trigger mode"); + } if (0 == pStmt->pOptions->ignoreExpired) { taosMemoryFree(pMeta); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "For virtual table IGNORE EXPIRED must be 1"); @@ -13468,6 +13474,93 @@ static int32_t buildStreamNotifyOptions(STranslateContext* pCxt, SStreamNotifyOp return code; } +static int32_t buildQueryTableColIdList(SSelectStmt *pSelect, SArray** ppRes) { + STableNode* pTable = (STableNode*)pSelect->pFromTable; + SNodeList* pColList = NULL; + SNode* pCol = NULL; + int32_t code = 0; + PAR_ERR_RET(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pTable->tableAlias, COLLECT_COL_TYPE_COL, &pColList)); + *ppRes = taosArrayInit(pColList->length, sizeof(int16_t)); + if (NULL == *ppRes) { + code = terrno; + parserError("taosArrayInit 0x%p colId failed, errno:0x%x", *ppRes, code); + goto _return; + } + + FOREACH(pCol, pColList) { + if (NULL == taosArrayPush(*ppRes, &((SColumnNode*)pCol)->colId)) { + code = terrno; + parserError("taosArrayPush 0x%p colId failed, errno:0x%x", *ppRes, code); + goto _return; + } + } + +_return: + + nodesDestroyList(pColList); + if (code) { + taosArrayDestroy(*ppRes); + *ppRes = NULL; + } + + return code; +} + +static int32_t modifyVtableSrcNumBasedOnCols(SVCTableRefCols* pTb, SArray* pColIdList, SSHashObj* pTbHash) { + tSimpleHashClear(pTbHash); + + char tbFName[TSDB_TABLE_FNAME_LEN]; + int32_t colNum = taosArrayGetSize(pColIdList); + for (int32_t i = 0; i < colNum; ++i) { + int16_t *colId = taosArrayGet(pColIdList, i); + for (int32_t m = 0; m < pTb->numOfColRefs; ++m) { + if (*colId == pTb->refCols[m].colId) { + snprintf(tbFName, sizeof(tbFName), "%s.%s", pTb->refCols[m].refDbName, pTb->refCols[m].refTableName); + PAR_ERR_RET(tSimpleHashPut(pTbHash, tbFName, strlen(tbFName) + 1, &colNum, sizeof(colNum))); + } + } + } + + pTb->numOfSrcTbls = tSimpleHashGetSize(pTbHash); + + return TSDB_CODE_SUCCESS; +} + +static int32_t modifyVtableSrcNumBasedOnQuery(SArray* pVSubTables, SNode* pStmt) { + SSelectStmt *pSelect = (SSelectStmt*)pStmt; + SArray* pColIdList = NULL; + SSHashObj* pTbHash = NULL; + int32_t code = 0; + int32_t colNum = 0; + int32_t vgNum = taosArrayGetSize(pVSubTables); + if (vgNum > 0) { + PAR_ERR_JRET(buildQueryTableColIdList(pSelect, &pColIdList)); + colNum = taosArrayGetSize(pColIdList); + pTbHash = tSimpleHashInit(colNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); + if (NULL == pTbHash) { + code = terrno; + parserError("tSimpleHashInit failed, colNum:%d, errno:0x%x", colNum, code); + PAR_ERR_JRET(code); + } + } + + for (int32_t i = 0; i < vgNum; ++i) { + SVSubTablesRsp* pVg = (SVSubTablesRsp*)taosArrayGet(pVSubTables, i); + int32_t vtbNum = taosArrayGetSize(pVg->pTables); + for (int32_t m = 0; m < vtbNum; ++m) { + SVCTableRefCols* pTb = (SVCTableRefCols*)taosArrayGetP(pVg->pTables, m); + PAR_ERR_JRET(modifyVtableSrcNumBasedOnCols(pTb, pColIdList, pTbHash)); + } + } + +_return: + + taosArrayDestroy(pColIdList); + tSimpleHashCleanup(pTbHash); + + return code; +} + static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { pReq->igExists = pStmt->ignoreExists; @@ -13523,8 +13616,11 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* if (TSDB_CODE_SUCCESS == code) { code = buildStreamNotifyOptions(pCxt, pStmt->pNotifyOptions, pReq); } - if (TSDB_CODE_SUCCESS == code && pCxt->pMetaCache != NULL) { - TSWAP(pReq->pVSubTables, pCxt->pMetaCache->pVSubTables); + if (TSDB_CODE_SUCCESS == code && pCxt->pMetaCache != NULL && pCxt->pMetaCache->pVSubTables != NULL) { + code = modifyVtableSrcNumBasedOnQuery(pCxt->pMetaCache->pVSubTables, pStmt->pQuery); + if (TSDB_CODE_SUCCESS == code) { + TSWAP(pReq->pVSubTables, pCxt->pMetaCache->pVSubTables); + } } return code; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 4faef0594b..d3412138e1 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -110,6 +110,7 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.childId = pReq->upstreamChildId; + pDataBlock->info.id.uid = be64toh(pRetrieve->useconds); } pData->blocks = pArray; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 79262b9957..b329421585 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -25,7 +25,7 @@ typedef struct SBlockName { static void doMonitorDispatchData(void* param, void* tmrId); static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet); -static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); +static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq, bool withUid); static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId, int64_t now); static int32_t streamMapAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, @@ -367,7 +367,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD return terrno; } - code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs); + code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs, false); if (code != TSDB_CODE_SUCCESS) { destroyDispatchMsg(pReqs, 1); return code; @@ -393,7 +393,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD if (type == STREAM_DELETE_RESULT || type == STREAM_CHECKPOINT || type == STREAM_TRANS_STATE || type == STREAM_RECALCULATE_START) { for (int32_t j = 0; j < numOfVgroups; j++) { - code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]); + code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j], false); if (code != 0) { destroyDispatchMsg(pReqs, numOfVgroups); return code; @@ -438,7 +438,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) { for (int32_t j = 0; j < numOfTasks; j++) { - code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]); + code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j], false); if (code != 0) { destroyDispatchMsg(pReqs, numOfTasks); return code; @@ -601,21 +601,43 @@ static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int SStreamDispatchReq* pReq = pTask->msgInfo.pData; int32_t msgId = pTask->msgInfo.msgId; - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgroups = taosArrayGetSize(vgInfo); - setResendInfo(pEntry, now); - for (int32_t j = 0; j < numOfVgroups; ++j) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); - if (pVgInfo == NULL) { - continue; - } - if (pVgInfo->vgId == pEntry->nodeId) { - int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet); - stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s", - pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId, pMsg, msgId, tstrerror(code)); - break; + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgroups = taosArrayGetSize(vgInfo); + + for (int32_t j = 0; j < numOfVgroups; ++j) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + if (pVgInfo == NULL) { + continue; + } + + if (pVgInfo->vgId == pEntry->nodeId) { + int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet); + stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s", + pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId, pMsg, msgId, + tstrerror(code)); + break; + } + } + } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) { + SArray *pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos; + int32_t numOfTasks = taosArrayGetSize(pTaskInfos); + + for (int32_t j = 0; j < numOfTasks; ++j) { + STaskDispatcherFixed *pAddr = taosArrayGet(pTaskInfos, j); + if (pAddr == NULL) { + continue; + } + + if (pAddr->nodeId == pEntry->nodeId) { + int32_t code = doSendDispatchMsg(pTask, &pReq[j], pAddr->nodeId, &pAddr->epSet); + stDebug("s-task:%s (child taskId:%d) vtable-map-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s", + pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pAddr->nodeId, pMsg, msgId, + tstrerror(code)); + break; + } } } } @@ -638,9 +660,10 @@ static int32_t sendFailedDispatchData(SStreamTask* pTask, int64_t now) { int32_t msgId = pMsgInfo->msgId; SStreamDispatchReq* pReq = pTask->msgInfo.pData; - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, pTask->info.selfChildId, - msgId); + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) { + const char *taskType = (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) ? "shuffle" : "vtable-map"; + stDebug("s-task:%s (child taskId:%d) retry %s-dispatch to down streams, msgId:%d", id, pTask->info.selfChildId, + taskType, msgId); int32_t numOfRetry = 0; for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { @@ -674,8 +697,8 @@ static int32_t sendFailedDispatchData(SStreamTask* pTask, int64_t now) { } } - stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfRetry, - msgId); + stDebug("s-task:%s complete retry %s-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, taskType, + numOfRetry, msgId); } else { int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId; SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; @@ -800,7 +823,7 @@ static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, stDebug("s-task:%s dst table hashVal:0x%x assign to vgId:%d range[0x%x, 0x%x]", pTask->id.idStr, hashValue, pVgInfo->vgId, pVgInfo->hashBegin, pVgInfo->hashEnd); - if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j])) < 0) { + if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j], false)) < 0) { stError("s-task:%s failed to add dispatch block, code:%s", pTask->id.idStr, tstrerror(terrno)); return code; } @@ -915,7 +938,7 @@ int32_t streamMapAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDa STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, *pIdx); QUERY_CHECK_NULL(pAddr, code, lino, _end, terrno); - code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[*pIdx]); + code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[*pIdx], true); QUERY_CHECK_CODE(code, lino, _end); if (pReqs[*pIdx].blockNum == 0) { @@ -1402,7 +1425,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { +int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq, bool withUid) { size_t dataEncodeSize = blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN; void* buf = taosMemoryCalloc(1, dataStrLen); @@ -1411,7 +1434,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch } SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; - pRetrieve->useconds = 0; + pRetrieve->useconds = withUid ? htobe64(pBlock->info.id.uid) : 0; pRetrieve->precision = TSDB_DEFAULT_PRECISION; pRetrieve->compressed = 0; pRetrieve->completed = 1; @@ -1875,7 +1898,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i streamMutexUnlock(&pMsgInfo->lock); - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) { if (!allRsp) { stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, " diff --git a/tests/army/stream/test_stream_vtable.py b/tests/army/stream/test_stream_vtable.py new file mode 100644 index 0000000000..c0c9d4bd1a --- /dev/null +++ b/tests/army/stream/test_stream_vtable.py @@ -0,0 +1,280 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from frame import etool +from frame.etool import * +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame.common import * +import time + +class TDTestCase(TBase): + + def create_tables(self): + tdLog.info("create tables") + + tdSql.execute("drop database if exists test_stream_vtable;") + tdSql.execute("create database test_stream_vtable vgroups 8;") + tdSql.execute("use test_stream_vtable;") + + tdLog.info(f"create org super table.") + tdSql.execute("select database();") + tdSql.execute(f"CREATE STABLE `vtb_org_stb` (" + "ts timestamp, " + "u_tinyint_col tinyint unsigned, " + "u_smallint_col smallint unsigned, " + "u_int_col int unsigned, " + "u_bigint_col bigint unsigned, " + "tinyint_col tinyint, " + "smallint_col smallint, " + "int_col int, " + "bigint_col bigint, " + "float_col float, " + "double_col double, " + "bool_col bool, " + "binary_16_col binary(16)," + "binary_32_col binary(32)," + "nchar_16_col nchar(16)," + "nchar_32_col nchar(32)" + ") TAGS (" + "int_tag int," + "bool_tag bool," + "float_tag float," + "double_tag double)") + + tdLog.info(f"create org child table.") + for i in range(3): + tdSql.execute(f"CREATE TABLE `vtb_org_child_{i}` USING `vtb_org_stb` TAGS ({i}, false, {i}, {i});") + + tdLog.info(f"create virtual normal table.") + + tdSql.execute(f"CREATE VTABLE `vtb_virtual_ntb_full` (" + "ts timestamp, " + "u_tinyint_col tinyint unsigned from vtb_org_child_0.u_tinyint_col, " + "u_smallint_col smallint unsigned from vtb_org_child_1.u_smallint_col, " + "u_int_col int unsigned from vtb_org_child_2.u_int_col, " + "u_bigint_col bigint unsigned from vtb_org_child_0.u_bigint_col, " + "tinyint_col tinyint from vtb_org_child_1.tinyint_col, " + "smallint_col smallint from vtb_org_child_2.smallint_col, " + "int_col int from vtb_org_child_0.int_col, " + "bigint_col bigint from vtb_org_child_1.bigint_col, " + "float_col float from vtb_org_child_2.float_col, " + "double_col double from vtb_org_child_0.double_col, " + "bool_col bool from vtb_org_child_1.bool_col, " + "binary_16_col binary(16) from vtb_org_child_2.binary_16_col," + "binary_32_col binary(32) from vtb_org_child_0.binary_32_col," + "nchar_16_col nchar(16) from vtb_org_child_1.nchar_16_col," + "nchar_32_col nchar(32) from vtb_org_child_2.nchar_32_col)") + + tdSql.execute(f"CREATE VTABLE `vtb_virtual_ntb_half_full` (" + "ts timestamp, " + "u_tinyint_col tinyint unsigned from vtb_org_child_0.u_tinyint_col, " + "u_smallint_col smallint unsigned from vtb_org_child_1.u_smallint_col, " + "u_int_col int unsigned from vtb_org_child_2.u_int_col, " + "u_bigint_col bigint unsigned, " + "tinyint_col tinyint, " + "smallint_col smallint, " + "int_col int from vtb_org_child_0.int_col, " + "bigint_col bigint from vtb_org_child_1.bigint_col, " + "float_col float from vtb_org_child_2.float_col, " + "double_col double, " + "bool_col bool, " + "binary_16_col binary(16)," + "binary_32_col binary(32) from vtb_org_child_0.binary_32_col," + "nchar_16_col nchar(16) from vtb_org_child_1.nchar_16_col," + "nchar_32_col nchar(32) from vtb_org_child_2.nchar_32_col)") + + tdSql.execute(f"CREATE STABLE `vtb_virtual_stb` (" + "ts timestamp, " + "u_tinyint_col tinyint unsigned, " + "u_smallint_col smallint unsigned, " + "u_int_col int unsigned, " + "u_bigint_col bigint unsigned, " + "tinyint_col tinyint, " + "smallint_col smallint, " + "int_col int, " + "bigint_col bigint, " + "float_col float, " + "double_col double, " + "bool_col bool, " + "binary_16_col binary(16)," + "binary_32_col binary(32)," + "nchar_16_col nchar(16)," + "nchar_32_col nchar(32)" + ") TAGS (" + "int_tag int," + "bool_tag bool," + "float_tag float," + "double_tag double)" + "VIRTUAL 1") + + tdLog.info(f"create virtual child table.") + + tdSql.execute(f"CREATE VTABLE `vtb_virtual_ctb_full` (" + "u_tinyint_col from vtb_org_child_0.u_tinyint_col, " + "u_smallint_col from vtb_org_child_1.u_smallint_col, " + "u_int_col from vtb_org_child_2.u_int_col, " + "u_bigint_col from vtb_org_child_0.u_bigint_col, " + "tinyint_col from vtb_org_child_1.tinyint_col, " + "smallint_col from vtb_org_child_2.smallint_col, " + "int_col from vtb_org_child_0.int_col, " + "bigint_col from vtb_org_child_1.bigint_col, " + "float_col from vtb_org_child_2.float_col, " + "double_col from vtb_org_child_0.double_col, " + "bool_col from vtb_org_child_1.bool_col, " + "binary_16_col from vtb_org_child_2.binary_16_col," + "binary_32_col from vtb_org_child_0.binary_32_col," + "nchar_16_col from vtb_org_child_1.nchar_16_col," + "nchar_32_col from vtb_org_child_2.nchar_32_col)" + "USING `vtb_virtual_stb` TAGS (0, false, 0, 0)") + + tdSql.execute(f"CREATE VTABLE `vtb_virtual_ctb_half_full` (" + "u_tinyint_col from vtb_org_child_0.u_tinyint_col, " + "u_smallint_col from vtb_org_child_1.u_smallint_col, " + "u_int_col from vtb_org_child_2.u_int_col, " + "int_col from vtb_org_child_0.int_col, " + "bigint_col from vtb_org_child_1.bigint_col, " + "float_col from vtb_org_child_2.float_col, " + "binary_32_col from vtb_org_child_0.binary_32_col," + "nchar_16_col from vtb_org_child_1.nchar_16_col," + "nchar_32_col from vtb_org_child_2.nchar_32_col)" + "USING `vtb_virtual_stb` TAGS (1, false, 1, 1)") + + tdSql.execute(f"CREATE VTABLE `vtb_virtual_ctb_empty` " + "USING `vtb_virtual_stb` TAGS (2, false, 2, 2)") + + def create_proj_streams(self): + tdSql.execute(f"CREATE STREAM s_proj_1 TRIGGER AT_ONCE INTO dst_proj_1 AS " + "select * from test_stream_vtable.vtb_virtual_ntb_full;") + tdSql.execute(f"CREATE STREAM s_proj_2 TRIGGER AT_ONCE INTO dst_proj_2 AS " + "select * from test_stream_vtable.vtb_virtual_ntb_half_full;") + tdSql.execute(f"CREATE STREAM s_proj_3 TRIGGER AT_ONCE INTO dst_proj_3 AS " + "select * from test_stream_vtable.vtb_virtual_stb PARTITION BY tbname;") + tdSql.execute(f"CREATE STREAM s_proj_4 TRIGGER AT_ONCE INTO dst_proj_4 AS " + "select * from test_stream_vtable.vtb_virtual_ctb_full;") + tdSql.execute(f"CREATE STREAM s_proj_5 TRIGGER AT_ONCE INTO dst_proj_5 AS " + "select * from test_stream_vtable.vtb_virtual_ctb_half_full;") + + tdSql.execute(f"CREATE STREAM s_proj_6 TRIGGER AT_ONCE INTO dst_proj_6 AS " + "select * from test_stream_vtable.vtb_virtual_ntb_full WHERE u_tinyint_col = 1;") + tdSql.execute(f"CREATE STREAM s_proj_7 TRIGGER AT_ONCE INTO dst_proj_7 AS " + "select * from test_stream_vtable.vtb_virtual_ntb_half_full WHERE bool_col = true;") + tdSql.execute(f"CREATE STREAM s_proj_8 TRIGGER AT_ONCE INTO dst_proj_8 AS " + "select * from test_stream_vtable.vtb_virtual_stb WHERE bool_col = true PARTITION BY tbname;") + tdSql.execute(f"CREATE STREAM s_proj_9 TRIGGER AT_ONCE INTO dst_proj_9 AS " + "select * from test_stream_vtable.vtb_virtual_ctb_full WHERE u_tinyint_col = 1;") + tdSql.execute(f"CREATE STREAM s_proj_10 TRIGGER AT_ONCE INTO dst_proj_10 AS " + "select * from test_stream_vtable.vtb_virtual_ctb_half_full WHERE bool_col = true;") + + tdSql.execute(f"CREATE STREAM s_proj_11 TRIGGER AT_ONCE INTO dst_proj_11 AS " + "select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ntb_full;") + tdSql.execute(f"CREATE STREAM s_proj_12 TRIGGER AT_ONCE INTO dst_proj_12 AS " + "select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ntb_half_full;") + tdSql.execute(f"CREATE STREAM s_proj_13 TRIGGER AT_ONCE INTO dst_proj_13 AS " + "select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_stb PARTITION BY tbname;") + tdSql.execute(f"CREATE STREAM s_proj_14 TRIGGER AT_ONCE INTO dst_proj_14 AS " + "select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ctb_full;") + tdSql.execute(f"CREATE STREAM s_proj_15 TRIGGER AT_ONCE INTO dst_proj_15 AS " + "select ts, cos(u_tinyint_col), u_smallint_col, u_int_col, u_bigint_col from test_stream_vtable.vtb_virtual_ctb_half_full;") + + def create_window_streams(self): + tdSql.execute(f"CREATE STREAM s_interval_1 INTO dst_interval_1 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full interval(1s);") + tdSql.execute(f"CREATE STREAM s_interval_2 INTO dst_interval_2 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full interval(1s) sliding(100a);") + tdSql.execute(f"CREATE STREAM s_interval_3 INTO dst_interval_3 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname interval(1s) sliding(200a);") + tdSql.execute(f"CREATE STREAM s_interval_4 INTO dst_interval_4 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full interval(1s) sliding(100a);") + tdSql.execute(f"CREATE STREAM s_interval_5 INTO dst_interval_5 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full interval(1s);") + + tdSql.execute(f"CREATE STREAM s_state_1 INTO dst_state_1 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full state_window(bool_col);") + tdSql.execute(f"CREATE STREAM s_state_2 INTO dst_state_2 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full state_window(bool_col);") + tdSql.execute(f"CREATE STREAM s_state_3 INTO dst_state_3 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname state_window(bool_col);") + tdSql.execute(f"CREATE STREAM s_state_4 INTO dst_state_4 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full state_window(bool_col);") + tdSql.execute(f"CREATE STREAM s_state_5 INTO dst_state_5 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full state_window(bool_col);") + + tdSql.execute(f"CREATE STREAM s_session_1 INTO dst_session_1 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full session(ts, 10a);") + tdSql.execute(f"CREATE STREAM s_session_2 INTO dst_session_2 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full session(ts, 10a);") + tdSql.execute(f"CREATE STREAM s_session_3 INTO dst_session_3 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname session(ts, 10a);") + tdSql.execute(f"CREATE STREAM s_session_4 INTO dst_session_4 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full session(ts, 10a);") + tdSql.execute(f"CREATE STREAM s_session_5 INTO dst_session_5 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full session(ts, 10a);") + + tdSql.execute(f"CREATE STREAM s_event_1 INTO dst_event_1 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;") + tdSql.execute(f"CREATE STREAM s_event_2 INTO dst_event_2 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;") + tdSql.execute(f"CREATE STREAM s_event_3 INTO dst_event_3 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;") + tdSql.execute(f"CREATE STREAM s_event_4 INTO dst_event_4 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;") + tdSql.execute(f"CREATE STREAM s_event_5 INTO dst_event_5 AS " + "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full event_window start with u_tinyint_col > 50 end with u_smallint_col > 10000;") + + # tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS " + # "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_full count_window(20);") + # tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS " + # "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ntb_half_full count_window(20);") + # tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS " + # "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_stb partition by tbname count_window(20);") + # tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS " + # "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_full count_window(20);") + # tdSql.execute(f"CREATE STREAM s_count_1 INTO dst_count_1 AS " + # "select _wstart, _wend, first(u_tinyint_col), last(tinyint_col) from test_stream_vtable.vtb_virtual_ctb_half_full count_window(20);") + + def wait_streams_ready(self): + for i in range(60): + tdLog.info(f"i={i} wait for stream tasks ready ...") + time.sleep(1) + rows = tdSql.query("select * from information_schema.ins_stream_tasks where status <> 'ready';") + if rows == 0: + break + + def wait_streams_done(self): + # The entire test runs for a while. Wait briefly, and if no exceptions occur, it's sufficient. + for i in range(30): + tdLog.info(f"i={i} wait for stream tasks done ...") + time.sleep(1) + rows = tdSql.query("select * from information_schema.ins_stream_tasks where status <> 'ready';") + if rows != 0: + raise Exception("stream task status is wrong, please check it!") + + + def run(self): + tdLog.debug(f"start to excute {__file__}") + + self.create_tables() + self.create_proj_streams() + self.wait_streams_ready() + json = etool.curFile(__file__, "vtable_insert.json") + etool.benchMark(json=json) + self.wait_streams_done() + + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/stream/vtable_insert.json b/tests/army/stream/vtable_insert.json new file mode 100644 index 0000000000..cacdc0b288 --- /dev/null +++ b/tests/army/stream/vtable_insert.json @@ -0,0 +1,76 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 3, + "create_table_thread_count": 4, + "result_file": "./insert_res.txt", + "confirm_parameter_prompt": "no", + "num_of_records_per_req": 10000, + "prepared_rand": 10000, + "chinese": "no", + "escape_character": "yes", + "continue_if_fail": "no", + "databases": [ + { + "dbinfo": { + "name": "test_stream_vtable", + "drop": "no", + "vgroups": 8, + "precision": "ms" + }, + "super_tables": [ + { + "name": "vtb_org_stb", + "child_table_exists": "yes", + "childtable_count": 3, + "childtable_prefix": "vtb_org_child_", + "auto_create_table": "no", + "batch_create_tbl_num": 5, + "data_source": "rand", + "insert_mode": "taosc", + "non_stop_mode": "no", + "line_protocol": "line", + "insert_rows": 10000, + "childtable_limit": 0, + "childtable_offset": 0, + "interlace_rows": 0, + "insert_interval": 10, + "partial_col_num": 0, + "timestamp_step": 500, + "start_timestamp": "2025-01-01 00:00:00.000", + "sample_format": "csv", + "sample_file": "./sample.csv", + "use_sample_ts": "no", + "tags_file": "", + "columns": [ + {"type": "UTINYINT", "name": "u_tinyint_col"}, + {"type": "USMALLINT", "name": "u_smallint_col"}, + {"type": "UINT", "name": "u_int_col"}, + {"type": "UBIGINT", "name": "u_bigint_col"}, + {"type": "TINYINT", "name": "tinyint_col"}, + {"type": "SMALLINT", "name": "smallint_col"}, + {"type": "INT", "name": "int_col"}, + {"type": "BIGINT", "name": "bigint_col"}, + {"type": "FLOAT", "name": "float_col"}, + {"type": "DOUBLE", "name": "double_col"}, + {"type": "BOOL", "name": "bool_col"}, + {"type": "BINARY", "name": "binary_16_col", "len": 16}, + {"type": "BINARY", "name": "binary_32_col", "len": 32}, + {"type": "NCHAR", "name": "nchar_16_col", "len": 16}, + {"type": "NCHAR", "name": "nchar_32_col", "len": 32} + ], + "tags": [ + {"type": "INT", "name": "int_tag"}, + {"type": "BOOL", "name": "bool_tag"}, + {"type": "FLOAT", "name": "float_tag"}, + {"type": "DOUBLE", "name": "double_tag"} + ] + } + ] + } + ] +} diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index bbed373684..4fc4e83790 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -90,6 +90,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py ,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py -N 3 -M 3 +,,y,army,./pytest.sh python3 ./test.py -f stream/test_stream_vtable.py # # army/tools