From b2e615d4e70a78a8ef80bf4cf3e4ce7af6c9381e Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 10 Aug 2023 17:30:01 +0800 Subject: [PATCH 01/16] enhance: tag scan cursor based block --- include/libs/executor/storageapi.h | 14 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/meta/metaQuery.c | 12 +- source/dnode/vnode/src/vnd/vnodeInitApi.c | 4 + source/libs/executor/inc/executil.h | 2 + source/libs/executor/inc/executorInt.h | 4 + source/libs/executor/inc/operator.h | 2 +- source/libs/executor/src/executil.c | 4 +- source/libs/executor/src/operator.c | 2 +- source/libs/executor/src/scanoperator.c | 271 +++++++++++++++++++++- 10 files changed, 298 insertions(+), 19 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 773f373a2d..724d6638db 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -98,6 +98,16 @@ typedef struct SMTbCursor { int8_t paused; } SMTbCursor; +typedef struct SMCtbCursor { + SMeta *pMeta; + void *pCur; + tb_uid_t suid; + void *pKey; + void *pVal; + int kLen; + int vLen; +} SMCtbCursor; + typedef struct SRowBuffPos { void* pRowBuff; void* pKey; @@ -278,13 +288,15 @@ typedef struct SStoreMeta { void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & // metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); - int64_t (*getNumOfRowsInMem)(void* pVnode); /** int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); */ + SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock); + void (*closeCtbCursor)(SMCtbCursor *pCtbCur, int lock); + tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur); } SStoreMeta; typedef struct SStoreMetaReader { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index cd7704940b..e3b2d3e41e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -167,7 +167,7 @@ int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); int64_t metaGetTimeSeriesNum(SMeta* pMeta); -SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock); +SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock); void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock); tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index c26bb45c2b..31c7bc8500 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -408,17 +408,9 @@ _err: return NULL; } -struct SMCtbCursor { - SMeta *pMeta; - TBC *pCur; - tb_uid_t suid; - void *pKey; - void *pVal; - int kLen; - int vLen; -}; -SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) { +SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) { + SMeta* pMeta = ((SVnode*)pVnode)->pMeta; SMCtbCursor *pCtbCur = NULL; SCtbIdxKey ctbIdxKey; int ret = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 5c8d563d73..dca8dd271c 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -96,6 +96,10 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->metaGetCachedTbGroup = metaGetCachedTbGroup; pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache; + + pMeta->openCtbCursor = metaOpenCtbCursor; + pMeta->closeCtbCursor = metaCloseCtbCursor; + pMeta->ctbCursorNext = metaCtbCursorNext; } void initTqAPI(SStoreTqReader* pTq) { diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 33c9d845b9..f273f63770 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -190,4 +190,6 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag); void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); +SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, + SStorageAPI* pStorageAPI); #endif // TDENGINE_EXECUTIL_H diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index fbca5e29f9..cadf367481 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -259,6 +259,10 @@ typedef struct STagScanInfo { SLimitNode* pSlimit; SReadHandle readHandle; STableListInfo* pTableListInfo; + uint64_t suid; + void* pCtbCursor; + SNode* pTagCond; + SNode* pTagIndexCond; } STagScanInfo; typedef enum EStreamScanMode { diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index e6c3405d7f..38cefc1cc5 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -81,7 +81,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SNode* pTagCond, SNode*pTagIndexCond, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index aa0c7945b0..5bb8f8a38b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -47,8 +47,6 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* p static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); -static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, - SStorageAPI* pStorageAPI); static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } @@ -846,7 +844,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S return -1; } -static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, +SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, SStorageAPI* pStorageAPI) { SSDataBlock* pResBlock = createDataBlock(); if (pResBlock == NULL) { diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 8ddcc8fd15..0fc1b77b73 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -380,7 +380,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR return NULL; } - pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 71b0747be8..24ed717c8a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2688,6 +2688,271 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, } } +static void tagScanFreeUidTag(void* p) { + STUidTagInfo* pInfo = p; + if (pInfo->pTagVal != NULL) { + taosMemoryFree(pInfo->pTagVal); + } +} + +static int32_t tagScanCreateResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) { + SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData)); + if (pColumnData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + pColumnData->info.type = pType->type; + pColumnData->info.bytes = pType->bytes; + pColumnData->info.scale = pType->scale; + pColumnData->info.precision = pType->precision; + + int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + taosMemoryFree(pColumnData); + return terrno; + } + + pParam->columnData = pColumnData; + pParam->colAlloced = true; + return TSDB_CODE_SUCCESS; +} + +typedef struct STagScanFilterContext { + SHashObj* colHash; + int32_t index; + SArray* cInfoList; +} STagScanFilterContext; + +static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) { + SColumnNode* pSColumnNode = NULL; + if (QUERY_NODE_COLUMN == nodeType((*pNode))) { + pSColumnNode = *(SColumnNode**)pNode; + } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) { + SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode); + if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) { + pSColumnNode = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pSColumnNode) { + return DEAL_RES_ERROR; + } + pSColumnNode->colId = -1; + pSColumnNode->colType = COLUMN_TYPE_TBNAME; + pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR; + pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE; + nodesDestroyNode(*pNode); + *pNode = (SNode*)pSColumnNode; + } else { + return DEAL_RES_CONTINUE; + } + } else { + return DEAL_RES_CONTINUE; + } + + STagScanFilterContext* pCtx = (STagScanFilterContext*)pContext; + void* data = taosHashGet(pCtx->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId)); + if (!data) { + taosHashPut(pCtx->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode))); + pSColumnNode->slotId = pCtx->index++; + SColumnInfo cInfo = {.colId = pSColumnNode->colId, + .type = pSColumnNode->node.resType.type, + .bytes = pSColumnNode->node.resType.bytes}; + taosArrayPush(pCtx->cInfoList, &cInfo); + } else { + SColumnNode* col = *(SColumnNode**)data; + pSColumnNode->slotId = col->slotId; + } + + return DEAL_RES_CONTINUE; +} + + +static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aUidTagIdxs, void* pVnode, SStorageAPI* pAPI) { + int32_t code = 0; + int32_t numOfTables = taosArrayGetSize(aUidTags); + STagScanFilterContext ctx = {0}; + ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); + ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); + + nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&ctx); + + SSDataBlock* pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, aUidTags, pVnode, pAPI); + if (pResBlock == NULL) { + + } + + SArray* pBlockList = taosArrayInit(1, POINTER_BYTES); + taosArrayPush(pBlockList, &pResBlock); + SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; + + SScalarParam output = {0}; + code = tagScanCreateResultData(&type, numOfTables, &output); + if (code != TSDB_CODE_SUCCESS) { + + } + + code = scalarCalculate(pTagCond, pBlockList, &output); + if (code != TSDB_CODE_SUCCESS) { + } + + bool* result = (bool*)output.columnData->pData; + for (int32_t i = 0 ; i < numOfTables; ++i) { + if (result[i]) { + taosArrayPush(aUidTagIdxs, &i); + } + } + + taosHashCleanup(ctx.colHash); + taosArrayDestroy(ctx.cInfoList); + blockDataDestroy(pResBlock); + taosArrayDestroy(pBlockList); + colDataDestroy(output.columnData); + taosMemoryFreeClear(output.columnData); +} + +static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) { + if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname + char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(str, "zsl"); + // if (pUidTagInfo->name != NULL) { + // STR_TO_VARSTR(str, pUidTagInfo->name); + // } else { // name is not retrieved during filter + // pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str); + // } + + colDataSetVal(pColInfo, rowIndex, str, false); + } else { + STagVal tagVal = {0}; + tagVal.cid = pExprInfo->base.pParam[0].pCol->colId; + if (pUidTagInfo->pTagVal == NULL) { + colDataSetNULL(pColInfo, rowIndex); + } else { + const char* p = pAPI->metaFn.extractTagVal(pUidTagInfo->pTagVal, pColInfo->info.type, &tagVal); + + if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) { + colDataSetNULL(pColInfo, rowIndex); + } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { + colDataSetVal(pColInfo, rowIndex, p, false); + } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { + char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1); + varDataSetLen(tmp, tagVal.nData); + memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData); + colDataSetVal(pColInfo, rowIndex, tmp, false); + taosMemoryFree(tmp); + } else { + colDataSetVal(pColInfo, rowIndex, (const char*)&tagVal.i64, false); + } + } + } +} + +static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aUidTagIdxs, + SStorageAPI* pAPI) { + STagScanInfo* pInfo = pOperator->info; + SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; + + for (int i = 0; i < taosArrayGetSize(aUidTagIdxs); ++i) { + STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, *(int32_t*)taosArrayGet(aUidTagIdxs, i)); + for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); + tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); + } + } + return 0; +} + +#if 0 +static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, + SStorageAPI* pAPI) { + STagScanInfo* pInfo = pOperator->info; + SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; + + int32_t nTbls = taosArrayGetSize(aUidTags); + for (int i = 0; i < nTbls; ++i) { + STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i); + for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); + + // refactor later + if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { + char str[512]; + + STR_TO_VARSTR(str, "zsl"); + colDataSetVal(pDst, (i), str, false); + } else { // it is a tag value + STagVal val = {0}; + val.cid = pExprInfo[j].base.pParam[0].pCol->colId; + const char* p = pAPI->metaFn.extractTagVal(pUidTagInfo->pTagVal, pDst->info.type, &val); + + char* data = NULL; + if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { + data = tTagValToData((const STagVal*)p, false); + } else { + data = (char*)p; + } + colDataSetVal(pDst, i, data, + (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + + if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && + data != NULL) { + taosMemoryFree(data); + } + } + } + } + return 0; +} +#endif + + +static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + + STagScanInfo* pInfo = pOperator->info; + SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; + SSDataBlock* pRes = pInfo->pRes; + blockDataCleanup(pRes); + int32_t count = 0; + + if (pInfo->pCtbCursor == NULL) { + pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1); + } + SArray* aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo)); + SArray* aUidTagIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t)); + while (1) { + while (count < pOperator->resultInfo.capacity) { + SMCtbCursor* pCur = pInfo->pCtbCursor; + tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor); + if (uid == 0) { + break; + } + STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal}; + info.pTagVal = taosMemoryMalloc(pCur->vLen); + memcpy(info.pTagVal, pCur->pVal, pCur->vLen); + taosArrayPush(aUidTags, &info); + } + + int32_t numTables = taosArrayGetSize(aUidTags); + if (numTables != 0 && pInfo->pTagCond != NULL) { + tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aUidTagIdxs, pAPI); + } + tagScanFillResultBlock(pOperator, pRes, aUidTags, aUidTagIdxs, pAPI); + if (taosArrayGetSize(aUidTagIdxs) != 0) { + break; + } + taosArrayClearEx(aUidTags, tagScanFreeUidTag); + taosArrayClear(aUidTagIdxs); + } + taosArrayDestroy(aUidTagIdxs); + taosArrayDestroyEx(aUidTags, tagScanFreeUidTag); + pOperator->resultInfo.totalRows += count; + return (pRes->info.rows == 0) ? NULL : pInfo->pRes; +} + static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2753,7 +3018,7 @@ static void destroyTagScanOperatorInfo(void* param) { } SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, - STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -2774,7 +3039,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi if (code != TSDB_CODE_SUCCESS) { goto _error; } - + pInfo->pTagCond = pTagCond; + pInfo->pTagIndexCond = pTagIndexCond; pInfo->pTableListInfo = pTableListInfo; pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; @@ -2789,6 +3055,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); + pInfo->suid = pPhyNode->suid; return pOperator; _error: From 20f5e2af5b5f6742466e82611f2e54278af6d776 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 10 Aug 2023 17:40:54 +0800 Subject: [PATCH 02/16] continue coding and save work --- source/libs/executor/src/scanoperator.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 24ed717c8a..42c488edbe 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2913,7 +2913,6 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; STagScanInfo* pInfo = pOperator->info; - SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); int32_t count = 0; @@ -2941,6 +2940,8 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aUidTagIdxs, pAPI); } tagScanFillResultBlock(pOperator, pRes, aUidTags, aUidTagIdxs, pAPI); + count = taosArrayGetSize(aUidTagIdxs); + if (taosArrayGetSize(aUidTagIdxs) != 0) { break; } From 7c39bc989083ce501dd6df5bd980b4edaab07057 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 11 Aug 2023 13:50:41 +0800 Subject: [PATCH 03/16] fix: some minor modifications --- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/scanoperator.c | 77 ++++++++++++++----------- 2 files changed, 44 insertions(+), 34 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index cadf367481..2b25feabb3 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -263,6 +263,7 @@ typedef struct STagScanInfo { void* pCtbCursor; SNode* pTagCond; SNode* pTagIndexCond; + SStorageAPI* pStorageAPI; } STagScanInfo; typedef enum EStreamScanMode { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 42c488edbe..5e0eb71c13 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2767,9 +2767,10 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) { } -static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aUidTagIdxs, void* pVnode, SStorageAPI* pAPI) { +static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI) { int32_t code = 0; int32_t numOfTables = taosArrayGetSize(aUidTags); + STagScanFilterContext ctx = {0}; ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); @@ -2777,48 +2778,42 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aU nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&ctx); SSDataBlock* pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, aUidTags, pVnode, pAPI); - if (pResBlock == NULL) { - - } SArray* pBlockList = taosArrayInit(1, POINTER_BYTES); taosArrayPush(pBlockList, &pResBlock); SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; SScalarParam output = {0}; - code = tagScanCreateResultData(&type, numOfTables, &output); - if (code != TSDB_CODE_SUCCESS) { + tagScanCreateResultData(&type, numOfTables, &output); - } - - code = scalarCalculate(pTagCond, pBlockList, &output); - if (code != TSDB_CODE_SUCCESS) { - } + scalarCalculate(pTagCond, pBlockList, &output); bool* result = (bool*)output.columnData->pData; for (int32_t i = 0 ; i < numOfTables; ++i) { if (result[i]) { - taosArrayPush(aUidTagIdxs, &i); + taosArrayPush(aFilterIdxs, &i); } } - taosHashCleanup(ctx.colHash); - taosArrayDestroy(ctx.cInfoList); - blockDataDestroy(pResBlock); - taosArrayDestroy(pBlockList); colDataDestroy(output.columnData); taosMemoryFreeClear(output.columnData); + + blockDataDestroy(pResBlock); + taosArrayDestroy(pBlockList); + + taosHashCleanup(ctx.colHash); + taosArrayDestroy(ctx.cInfoList); } static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) { if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; +// if (pUidTagInfo->name != NULL) { +// STR_TO_VARSTR(str, pUidTagInfo->name); +// } else { // name is not retrieved during filter +// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str); +// } STR_TO_VARSTR(str, "zsl"); - // if (pUidTagInfo->name != NULL) { - // STR_TO_VARSTR(str, pUidTagInfo->name); - // } else { // name is not retrieved during filter - // pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str); - // } colDataSetVal(pColInfo, rowIndex, str, false); } else { @@ -2846,13 +2841,15 @@ static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo } } -static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aUidTagIdxs, +static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aFilterIdxs, SStorageAPI* pAPI) { STagScanInfo* pInfo = pOperator->info; SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; - for (int i = 0; i < taosArrayGetSize(aUidTagIdxs); ++i) { - STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, *(int32_t*)taosArrayGet(aUidTagIdxs, i)); + size_t szTables = taosArrayGetSize(aFilterIdxs); + for (int i = 0; i < szTables; ++i) { + int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i); + STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx); for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); @@ -2920,8 +2917,10 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { if (pInfo->pCtbCursor == NULL) { pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1); } + SArray* aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo)); - SArray* aUidTagIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t)); + SArray* aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t)); + while (1) { while (count < pOperator->resultInfo.capacity) { SMCtbCursor* pCur = pInfo->pCtbCursor; @@ -2936,20 +2935,26 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { } int32_t numTables = taosArrayGetSize(aUidTags); - if (numTables != 0 && pInfo->pTagCond != NULL) { - tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aUidTagIdxs, pAPI); - } - tagScanFillResultBlock(pOperator, pRes, aUidTags, aUidTagIdxs, pAPI); - count = taosArrayGetSize(aUidTagIdxs); - - if (taosArrayGetSize(aUidTagIdxs) != 0) { + if (numTables == 0) { break; } + + tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aFilterIdxs, pAPI); + + tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, pAPI); + count = taosArrayGetSize(aFilterIdxs); + + if (count != 0) { + break; + } + taosArrayClearEx(aUidTags, tagScanFreeUidTag); - taosArrayClear(aUidTagIdxs); + taosArrayClear(aFilterIdxs); } - taosArrayDestroy(aUidTagIdxs); + + taosArrayDestroy(aFilterIdxs); taosArrayDestroyEx(aUidTags, tagScanFreeUidTag); + pOperator->resultInfo.totalRows += count; return (pRes->info.rows == 0) ? NULL : pInfo->pRes; } @@ -3012,6 +3017,9 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; + if (pInfo->pCtbCursor != NULL) { + pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1); + } pInfo->pRes = blockDataDestroy(pInfo->pRes); taosArrayDestroy(pInfo->matchInfo.pList); pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo); @@ -3043,6 +3051,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pInfo->pTagCond = pTagCond; pInfo->pTagIndexCond = pTagIndexCond; pInfo->pTableListInfo = pTableListInfo; + pInfo->pStorageAPI = &pTaskInfo->storageAPI; pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; From 1c7f854a719b590f4aa5d201d2543681d1e28975 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 11 Aug 2023 14:47:28 +0800 Subject: [PATCH 04/16] enhance: add only meta ctb index to tag scan physi node --- include/libs/nodes/plannodes.h | 14 +++- source/libs/nodes/src/nodesCloneFuncs.c | 11 ++- source/libs/nodes/src/nodesCodeFuncs.c | 70 ++++++++++++++++++- source/libs/nodes/src/nodesMsgFuncs.c | 89 +++++++++++++++++++++++++ 4 files changed, 180 insertions(+), 4 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 063318332a..0830dc4918 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -334,7 +334,19 @@ typedef struct SScanPhysiNode { bool groupOrderScan; } SScanPhysiNode; -typedef SScanPhysiNode STagScanPhysiNode; +typedef struct STagScanPhysiNode { + // SScanPhysiNode scan; //TODO? + SPhysiNode node; + SNodeList* pScanCols; + SNodeList* pScanPseudoCols; + uint64_t uid; // unique id of the table + uint64_t suid; + int8_t tableType; + SName tableName; + bool groupOrderScan; + bool onlyMetaCtbIdx; //no tbname, tag index not used. +} STagScanPhysiNode; + typedef SScanPhysiNode SBlockDistScanPhysiNode; typedef struct SLastRowScanPhysiNode { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index f5eacf0bd5..965af41fa7 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -564,7 +564,16 @@ static int32_t physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) { } static int32_t physiTagScanCopy(const STagScanPhysiNode* pSrc, STagScanPhysiNode* pDst) { - return physiScanCopy(pSrc, pDst); + COPY_BASE_OBJECT_FIELD(node, physiNodeCopy); + CLONE_NODE_LIST_FIELD(pScanCols); + CLONE_NODE_LIST_FIELD(pScanPseudoCols); + COPY_SCALAR_FIELD(uid); + COPY_SCALAR_FIELD(suid); + COPY_SCALAR_FIELD(tableType); + COPY_OBJECT_FIELD(tableName, sizeof(SName)); + COPY_SCALAR_FIELD(groupOrderScan); + COPY_SCALAR_FIELD(onlyMetaCtbIdx); + return TSDB_CODE_SUCCESS; } static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhysiNode* pDst) { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index f25616065e..3540f8cb70 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1562,7 +1562,7 @@ static const char* jkScanPhysiPlanTableName = "TableName"; static const char* jkScanPhysiPlanGroupOrderScan = "GroupOrderScan"; static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { - const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj; + const SScanPhysiNode* pNode = (const SScanPhysiNode*)pObj; int32_t code = physicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { @@ -1591,7 +1591,7 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { } static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) { - STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj; + SScanPhysiNode* pNode = (SScanPhysiNode*)pObj; int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { @@ -1619,6 +1619,70 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkTagScanPhysiOnlyMetaCtbIdx = "OnlyMetaCtbIdx"; + +static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) { + const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkScanPhysiPlanScanCols, pNode->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkScanPhysiPlanScanPseudoCols, pNode->pScanPseudoCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanSTableId, pNode->suid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkScanPhysiPlanGroupOrderScan, pNode->groupOrderScan); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkTagScanPhysiOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx); + } + return code; +} + +static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { + STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkScanPhysiPlanScanCols, &pNode->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkScanPhysiPlanScanPseudoCols, &pNode->pScanPseudoCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanTableId, &pNode->uid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanSTableId, &pNode->suid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkScanPhysiPlanGroupOrderScan, &pNode->groupOrderScan); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkTagScanPhysiOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx); + } + return code; +} + static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags"; static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort"; @@ -6590,6 +6654,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_LOGIC_PLAN: return logicPlanToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + return physiTableScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: return physiScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: @@ -6908,6 +6973,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_LOGIC_PLAN: return jsonToLogicPlan(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + return jsonToPhysiTagScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: return jsonToPhysiScanNode(pJson, pObj); diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 20e829766d..4d1120861d 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2003,6 +2003,91 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { + PHY_TAG_SCAN_CODE_BASE_NODE = 1, + PHY_TAG_SCAN_CODE_SCAN_COLS, + PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS, + PHY_TAG_SCAN_CODE_BASE_UID, + PHY_TAG_SCAN_CODE_BASE_SUID, + PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE, + PHY_TAG_SCAN_CODE_BASE_TABLE_NAME, + PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN, + PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX +}; + +static int32_t physiTagScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN_COLS, nodeListToMsg, pNode->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS, nodeListToMsg, pNode->pScanPseudoCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, PHY_TAG_SCAN_CODE_BASE_UID, pNode->uid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, PHY_TAG_SCAN_CODE_BASE_SUID, pNode->suid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN, pNode->groupOrderScan); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX, pNode->onlyMetaCtbIdx); + } + return code; +} + +static int32_t msgToPhysiTagScanNode(STlvDecoder* pDecoder, void* pObj) { + STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_TAG_SCAN_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_TAG_SCAN_CODE_SCAN_COLS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanCols); + break; + case PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanPseudoCols); + break; + case PHY_TAG_SCAN_CODE_BASE_UID: + code = tlvDecodeU64(pTlv, &pNode->uid); + break; + case PHY_TAG_SCAN_CODE_BASE_SUID: + code = tlvDecodeU64(pTlv, &pNode->suid); + break; + case PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE: + code = tlvDecodeI8(pTlv, &pNode->tableType); + break; + case PHY_TAG_SCAN_CODE_BASE_TABLE_NAME: + code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName); + break; + case PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN: + code = tlvDecodeBool(pTlv, &pNode->groupOrderScan); + break; + case PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX: + code = tlvDecodeBool(pTlv, &pNode->onlyMetaCtbIdx); + break; + default: + break; + } + } + + return code; +} + enum { PHY_LAST_ROW_SCAN_CODE_SCAN = 1, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, @@ -3726,6 +3811,8 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { code = caseWhenNodeToMsg(pObj, pEncoder); break; case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + code = physiTagScanNodeToMsg(pObj, pEncoder); + break; case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: code = physiScanNodeToMsg(pObj, pEncoder); break; @@ -3869,6 +3956,8 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { code = msgToCaseWhenNode(pDecoder, pObj); break; case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + code = msgToPhysiTagScanNode(pDecoder, pObj); + break; case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: code = msgToPhysiScanNode(pDecoder, pObj); break; From a0c62d215d36bc980aabe3ebb12ee32521db2c43 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 11 Aug 2023 14:54:43 +0800 Subject: [PATCH 05/16] enhance: tag scan only meta ctb idx backend modification --- source/libs/executor/src/operator.c | 19 ++++++++++--------- source/libs/executor/src/scanoperator.c | 9 ++++++--- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 0fc1b77b73..d0805a86e4 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -370,17 +370,18 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode; pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { - STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; + STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); - int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, - pTagIndexCond, pTaskInfo); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - qError("failed to getTableList, code: %s", tstrerror(code)); - return NULL; + if (!pTagScanPhyNode->onlyMetaCtbIdx) { + int32_t code = createScanTableListInfo(pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, + pTagIndexCond, pTaskInfo); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + qError("failed to getTableList, code: %s", tstrerror(code)); + return NULL; + } } - - pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); + pOperator = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5e0eb71c13..107ea14914 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3048,10 +3048,13 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi if (code != TSDB_CODE_SUCCESS) { goto _error; } + pInfo->pTagCond = pTagCond; pInfo->pTagIndexCond = pTagIndexCond; - pInfo->pTableListInfo = pTableListInfo; + pInfo->suid = pPhyNode->suid; pInfo->pStorageAPI = &pTaskInfo->storageAPI; + + pInfo->pTableListInfo = pTableListInfo; pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; @@ -3062,10 +3065,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + __optr_fn_t tagScanNextFn = (pPhyNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScan; pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); + createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); - pInfo->suid = pPhyNode->suid; return pOperator; _error: From 6530658815346945aaa333326ab72f54067182c4 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 11 Aug 2023 17:05:59 +0800 Subject: [PATCH 06/16] fix: continue coding --- source/libs/executor/src/operator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index d0805a86e4..7f0c5baa36 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -373,7 +373,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); if (!pTagScanPhyNode->onlyMetaCtbIdx) { - int32_t code = createScanTableListInfo(pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, + int32_t code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; From 47d2f9ad6df4385e0cb912841204e94cff1d4ed0 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 11 Aug 2023 17:52:52 +0800 Subject: [PATCH 07/16] fix: first run without tag cond --- source/dnode/vnode/src/meta/metaQuery.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeQuery.c | 6 +++--- source/libs/executor/src/operator.c | 1 + source/libs/executor/src/scanoperator.c | 15 +++++++++++---- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 31c7bc8500..39c3dfa080 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -427,7 +427,7 @@ SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) { metaRLock(pMeta); } - ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL); + ret = tdbTbcOpen(pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL); if (ret < 0) { metaULock(pMeta); taosMemoryFree(pCtbCur); @@ -1365,7 +1365,7 @@ int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) { } int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) { - SMCtbCursor *pCur = metaOpenCtbCursor(((SVnode *)pVnode)->pMeta, suid, 1); + SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, suid, 1); // If len > 0 means there already have uids, and we only want the // tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 51f4cee40c..48f8ec021d 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -440,7 +440,7 @@ int32_t vnodeGetTableList(void* pVnode, int8_t type, SArray* pList) { } int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) { - SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, uid, 1); + SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, uid, 1); while (1) { tb_uid_t id = metaCtbCursorNext(pCur); @@ -462,7 +462,7 @@ int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bo int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) { SVnode *pVnodeObj = pVnode; - SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj->pMeta, suid, 1); + SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj, suid, 1); while (1) { tb_uid_t id = metaCtbCursorNext(pCur); @@ -521,7 +521,7 @@ int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bo } int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { - SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid, 0); + SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, suid, 0); if (!pCur) { return TSDB_CODE_FAILED; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 7f0c5baa36..31998d13b6 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -371,6 +371,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; + pTagScanPhyNode->onlyMetaCtbIdx = true; STableListInfo* pTableListInfo = tableListCreate(); if (!pTagScanPhyNode->onlyMetaCtbIdx) { int32_t code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 107ea14914..5f8bf03d80 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2922,7 +2922,8 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { SArray* aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t)); while (1) { - while (count < pOperator->resultInfo.capacity) { + int32_t numTables = 0; + while (numTables < pOperator->resultInfo.capacity) { SMCtbCursor* pCur = pInfo->pCtbCursor; tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor); if (uid == 0) { @@ -2932,14 +2933,19 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { info.pTagVal = taosMemoryMalloc(pCur->vLen); memcpy(info.pTagVal, pCur->pVal, pCur->vLen); taosArrayPush(aUidTags, &info); + ++numTables; } - int32_t numTables = taosArrayGetSize(aUidTags); if (numTables == 0) { break; } - - tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aFilterIdxs, pAPI); + if (pInfo->pTagCond != NULL) { + tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aFilterIdxs, pAPI); + } else { + for (int i = 0; i < numTables; ++i) { + taosArrayPush(aFilterIdxs, &i); + } + } tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, pAPI); count = taosArrayGetSize(aFilterIdxs); @@ -2955,6 +2961,7 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { taosArrayDestroy(aFilterIdxs); taosArrayDestroyEx(aUidTags, tagScanFreeUidTag); + pRes->info.rows = count; pOperator->resultInfo.totalRows += count; return (pRes->info.rows == 0) ? NULL : pInfo->pRes; } From edd2fa4f351c3c3434c2143822b0a188a1c305d1 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 12 Aug 2023 08:17:43 +0800 Subject: [PATCH 08/16] fix: pass compilation and simple test --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5f8bf03d80..ac20bae167 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2940,7 +2940,7 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { break; } if (pInfo->pTagCond != NULL) { - tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aFilterIdxs, pAPI); + tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI); } else { for (int i = 0; i < numTables; ++i) { taosArrayPush(aFilterIdxs, &i); From f83bfec067deb4de1ab9e98434094f0d0e20a8cf Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 12 Aug 2023 08:28:25 +0800 Subject: [PATCH 09/16] fix: change only meta ctb idx back to false --- source/libs/executor/src/operator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 31998d13b6..abef8298e5 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -371,7 +371,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; - pTagScanPhyNode->onlyMetaCtbIdx = true; + pTagScanPhyNode->onlyMetaCtbIdx = false; STableListInfo* pTableListInfo = tableListCreate(); if (!pTagScanPhyNode->onlyMetaCtbIdx) { int32_t code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, From 6688d70ba41400c83e9686e0ca6565dc861b1327 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 13 Aug 2023 18:46:55 +0800 Subject: [PATCH 10/16] fix: fix planner test error --- source/libs/nodes/src/nodesCodeFuncs.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 3540f8cb70..a2de0bc63a 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -6654,7 +6654,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_LOGIC_PLAN: return logicPlanToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: - return physiTableScanNodeToJson(pObj, pJson); + return physiTagScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: return physiScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: From 57d1957dee1d8a738dfe0f2fe3efd9716433a280 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 14 Aug 2023 15:57:27 +0800 Subject: [PATCH 11/16] enhance: tag scan code refactoring --- source/libs/executor/inc/executorInt.h | 2 + source/libs/executor/src/scanoperator.c | 110 +++++++++--------------- 2 files changed, 43 insertions(+), 69 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 2b25feabb3..cb066d809c 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -263,6 +263,8 @@ typedef struct STagScanInfo { void* pCtbCursor; SNode* pTagCond; SNode* pTagIndexCond; + SArray* aUidTags; // SArray + SArray* aFilterIdxs; // SArray SStorageAPI* pStorageAPI; } STagScanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ac20bae167..71352b1c6e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2813,7 +2813,7 @@ static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo // } else { // name is not retrieved during filter // pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str); // } - STR_TO_VARSTR(str, "zsl"); + STR_TO_VARSTR(str, "ctbidx"); colDataSetVal(pColInfo, rowIndex, str, false); } else { @@ -2841,66 +2841,32 @@ static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo } } -static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aFilterIdxs, +static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aFilterIdxs, bool ignoreFilterIdx, SStorageAPI* pAPI) { STagScanInfo* pInfo = pOperator->info; SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; - - size_t szTables = taosArrayGetSize(aFilterIdxs); - for (int i = 0; i < szTables; ++i) { - int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i); - STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx); - for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); - tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); + if (!ignoreFilterIdx) { + size_t szTables = taosArrayGetSize(aFilterIdxs); + for (int i = 0; i < szTables; ++i) { + int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i); + STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx); + for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); + tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); + } } - } - return 0; -} - -#if 0 -static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, - SStorageAPI* pAPI) { - STagScanInfo* pInfo = pOperator->info; - SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; - - int32_t nTbls = taosArrayGetSize(aUidTags); - for (int i = 0; i < nTbls; ++i) { - STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i); - for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); - - // refactor later - if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { - char str[512]; - - STR_TO_VARSTR(str, "zsl"); - colDataSetVal(pDst, (i), str, false); - } else { // it is a tag value - STagVal val = {0}; - val.cid = pExprInfo[j].base.pParam[0].pCol->colId; - const char* p = pAPI->metaFn.extractTagVal(pUidTagInfo->pTagVal, pDst->info.type, &val); - - char* data = NULL; - if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { - data = tTagValToData((const STagVal*)p, false); - } else { - data = (char*)p; - } - colDataSetVal(pDst, i, data, - (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); - - if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && - data != NULL) { - taosMemoryFree(data); - } + } else { + size_t szTables = taosArrayGetSize(aUidTags); + for (int i = 0; i < szTables; ++i) { + STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i); + for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); + tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); } } } return 0; } -#endif - static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { @@ -2912,16 +2878,19 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { STagScanInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); - int32_t count = 0; if (pInfo->pCtbCursor == NULL) { pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1); } - SArray* aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo)); - SArray* aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t)); + SArray* aUidTags = pInfo->aUidTags; + SArray* aFilterIdxs = pInfo->aFilterIdxs; + int32_t count = 0; while (1) { + taosArrayClearEx(aUidTags, tagScanFreeUidTag); + taosArrayClear(aFilterIdxs); + int32_t numTables = 0; while (numTables < pOperator->resultInfo.capacity) { SMCtbCursor* pCur = pInfo->pCtbCursor; @@ -2939,34 +2908,29 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { if (numTables == 0) { break; } + bool ignoreFilterIdx = true; if (pInfo->pTagCond != NULL) { + ignoreFilterIdx = false; tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI); } else { - for (int i = 0; i < numTables; ++i) { - taosArrayPush(aFilterIdxs, &i); - } + ignoreFilterIdx = true; } - tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, pAPI); - count = taosArrayGetSize(aFilterIdxs); + tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, ignoreFilterIdx, pAPI); + + count = ignoreFilterIdx ? taosArrayGetSize(aUidTags): taosArrayGetSize(aFilterIdxs); if (count != 0) { break; } - - taosArrayClearEx(aUidTags, tagScanFreeUidTag); - taosArrayClear(aFilterIdxs); } - - taosArrayDestroy(aFilterIdxs); - taosArrayDestroyEx(aUidTags, tagScanFreeUidTag); - + pRes->info.rows = count; pOperator->resultInfo.totalRows += count; return (pRes->info.rows == 0) ? NULL : pInfo->pRes; } -static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { +static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -3027,6 +2991,10 @@ static void destroyTagScanOperatorInfo(void* param) { if (pInfo->pCtbCursor != NULL) { pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1); } + + taosArrayDestroy(pInfo->aFilterIdxs); + taosArrayDestroyEx(pInfo->aUidTags, tagScanFreeUidTag); + pInfo->pRes = blockDataDestroy(pInfo->pRes); taosArrayDestroy(pInfo->matchInfo.pList); pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo); @@ -3072,7 +3040,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - __optr_fn_t tagScanNextFn = (pPhyNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScan; + if (pPhyNode->onlyMetaCtbIdx) { + pInfo->aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo)); + pInfo->aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t)); + } + __optr_fn_t tagScanNextFn = (pPhyNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); From f9c897221cc66cd49bfeaa0905e802a305aece6a Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 14 Aug 2023 16:30:31 +0800 Subject: [PATCH 12/16] fix: move the setting of onlyCtbIdx to front end --- source/libs/executor/src/operator.c | 1 - source/libs/planner/src/planPhysiCreater.c | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index abef8298e5..7f0c5baa36 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -371,7 +371,6 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; - pTagScanPhyNode->onlyMetaCtbIdx = false; STableListInfo* pTableListInfo = tableListCreate(); if (!pTagScanPhyNode->onlyMetaCtbIdx) { int32_t code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 06859e195d..8efa9c1048 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -511,6 +511,20 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode); } +static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, + SPhysiNode** pPhyNode) { + STagScanPhysiNode* pScan = + (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN); + if (NULL == pScan) { + return TSDB_CODE_OUT_OF_MEMORY; + } + vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + + pScan->onlyMetaCtbIdx = false; + + return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); +} + static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { SLastRowScanPhysiNode* pScan = @@ -646,6 +660,7 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, pCxt->hasScan = true; switch (pScanLogicNode->scanType) { case SCAN_TYPE_TAG: + return createTagScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); case SCAN_TYPE_BLOCK_INFO: return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); case SCAN_TYPE_TABLE_COUNT: From 012248b68142496f0e8e2fa1833b4df4912b2c82 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 14 Aug 2023 19:26:53 +0800 Subject: [PATCH 13/16] fix: move the only ctb idx flag to logical plan --- include/libs/nodes/plannodes.h | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 10 ++++++++-- source/libs/planner/src/planOptimizer.c | 3 +++ source/libs/planner/src/planPhysiCreater.c | 2 +- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 0830dc4918..3e24e417fc 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -107,6 +107,7 @@ typedef struct SScanLogicNode { bool sortPrimaryKey; bool igLastNull; bool groupOrderScan; + bool onlyMetaCtbIdx; // for tag scan with no tbname } SScanLogicNode; typedef struct SJoinLogicNode { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index a2de0bc63a..4dfc55c0fa 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -660,6 +660,7 @@ static const char* jkScanLogicPlanDynamicScanFuncs = "DynamicScanFuncs"; static const char* jkScanLogicPlanDataRequired = "DataRequired"; static const char* jkScanLogicPlanTagCond = "TagCond"; static const char* jkScanLogicPlanGroupTags = "GroupTags"; +static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx"; static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; @@ -701,7 +702,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkScanLogicPlanGroupTags, pNode->pGroupTags); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkScanLogicPlanOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx); + } return code; } @@ -746,7 +749,10 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkScanLogicPlanGroupTags, &pNode->pGroupTags); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkScanLogicPlanOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx); + } + return code; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 16440be511..6944fc9f18 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2679,6 +2679,9 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp } nodesDestroyNode((SNode*)pAgg); tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode); + + pScanNode->onlyMetaCtbIdx = false; + pCxt->optimized = true; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 8efa9c1048..5f78b5de9c 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -520,7 +520,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla } vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); - pScan->onlyMetaCtbIdx = false; + pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx; return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); } From 84e472ad03b71aabd88670184d1de52c9b85dd3e Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 15 Aug 2023 16:10:54 +0800 Subject: [PATCH 14/16] enhance: tag cond col list only once and tag scan derive from scan --- include/libs/nodes/plannodes.h | 10 +---- source/libs/command/src/explain.c | 16 +++---- source/libs/executor/inc/executorInt.h | 7 +++ source/libs/executor/src/scanoperator.c | 36 +++++++-------- source/libs/nodes/src/nodesCloneFuncs.c | 9 +--- source/libs/nodes/src/nodesCodeFuncs.c | 48 ++------------------ source/libs/nodes/src/nodesMsgFuncs.c | 58 +++---------------------- 7 files changed, 41 insertions(+), 143 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3e24e417fc..4529520ace 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -336,15 +336,7 @@ typedef struct SScanPhysiNode { } SScanPhysiNode; typedef struct STagScanPhysiNode { - // SScanPhysiNode scan; //TODO? - SPhysiNode node; - SNodeList* pScanCols; - SNodeList* pScanPseudoCols; - uint64_t uid; // unique id of the table - uint64_t suid; - int8_t tableType; - SName tableName; - bool groupOrderScan; + SScanPhysiNode scan; bool onlyMetaCtbIdx; //no tbname, tag index not used. } STagScanPhysiNode; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index e917de33dd..e167b31ef8 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -291,17 +291,17 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i switch (pNode->type) { case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: { STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname); + EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->scan.tableName.tname); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } - if (pTagScanNode->pScanPseudoCols) { - EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pTagScanNode->pScanPseudoCols->length); + if (pTagScanNode->scan.pScanPseudoCols) { + EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pTagScanNode->scan.pScanPseudoCols->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } - EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); @@ -309,11 +309,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (verbose) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, - nodesGetOutputNumFromSlotList(pTagScanNode->node.pOutputDataBlockDesc->pSlots)); + nodesGetOutputNumFromSlotList(pTagScanNode->scan.node.pOutputDataBlockDesc->pSlots)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->outputRowSize); - EXPLAIN_ROW_APPEND_LIMIT(pTagScanNode->node.pLimit); - EXPLAIN_ROW_APPEND_SLIMIT(pTagScanNode->node.pSlimit); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->scan.node.pOutputDataBlockDesc->outputRowSize); + EXPLAIN_ROW_APPEND_LIMIT(pTagScanNode->scan.node.pLimit); + EXPLAIN_ROW_APPEND_SLIMIT(pTagScanNode->scan.node.pSlimit); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index cb066d809c..dad15dc6bc 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -251,6 +251,12 @@ typedef struct STableMergeScanInfo { SSortExecInfo sortExecInfo; } STableMergeScanInfo; +typedef struct STagScanFilterContext { + SHashObj* colHash; + int32_t index; + SArray* cInfoList; +} STagScanFilterContext; + typedef struct STagScanInfo { SColumnInfo* pCols; SSDataBlock* pRes; @@ -263,6 +269,7 @@ typedef struct STagScanInfo { void* pCtbCursor; SNode* pTagCond; SNode* pTagIndexCond; + STagScanFilterContext filterCtx; SArray* aUidTags; // SArray SArray* aFilterIdxs; // SArray SStorageAPI* pStorageAPI; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 71352b1c6e..ef28875be4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2719,12 +2719,6 @@ static int32_t tagScanCreateResultData(SDataType* pType, int32_t numOfRows, SSca return TSDB_CODE_SUCCESS; } -typedef struct STagScanFilterContext { - SHashObj* colHash; - int32_t index; - SArray* cInfoList; -} STagScanFilterContext; - static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) { SColumnNode* pSColumnNode = NULL; if (QUERY_NODE_COLUMN == nodeType((*pNode))) { @@ -2767,17 +2761,11 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) { } -static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI) { +static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI, STagScanInfo* pInfo) { int32_t code = 0; int32_t numOfTables = taosArrayGetSize(aUidTags); - STagScanFilterContext ctx = {0}; - ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); - ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); - - nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&ctx); - - SSDataBlock* pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, aUidTags, pVnode, pAPI); + SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI); SArray* pBlockList = taosArrayInit(1, POINTER_BYTES); taosArrayPush(pBlockList, &pResBlock); @@ -2801,8 +2789,7 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF blockDataDestroy(pResBlock); taosArrayDestroy(pBlockList); - taosHashCleanup(ctx.colHash); - taosArrayDestroy(ctx.cInfoList); + } static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) { @@ -2911,7 +2898,7 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { bool ignoreFilterIdx = true; if (pInfo->pTagCond != NULL) { ignoreFilterIdx = false; - tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI); + tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI, pInfo); } else { ignoreFilterIdx = true; } @@ -2991,7 +2978,8 @@ static void destroyTagScanOperatorInfo(void* param) { if (pInfo->pCtbCursor != NULL) { pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1); } - + taosHashCleanup(pInfo->filterCtx.colHash); + taosArrayDestroy(pInfo->filterCtx.cInfoList); taosArrayDestroy(pInfo->aFilterIdxs); taosArrayDestroyEx(pInfo->aUidTags, tagScanFreeUidTag); @@ -3001,8 +2989,9 @@ static void destroyTagScanOperatorInfo(void* param) { taosMemoryFreeClear(param); } -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo) { + SScanPhysiNode* pPhyNode = (STagScanPhysiNode*)pTagScanNode; STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -3040,11 +3029,16 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - if (pPhyNode->onlyMetaCtbIdx) { + if (pTagScanNode->onlyMetaCtbIdx) { pInfo->aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo)); pInfo->aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t)); + pInfo->filterCtx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); + pInfo->filterCtx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); + if (pInfo->pTagCond != NULL) { + nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx); + } } - __optr_fn_t tagScanNextFn = (pPhyNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry; + __optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 965af41fa7..d3cbaac5e1 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -564,14 +564,7 @@ static int32_t physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) { } static int32_t physiTagScanCopy(const STagScanPhysiNode* pSrc, STagScanPhysiNode* pDst) { - COPY_BASE_OBJECT_FIELD(node, physiNodeCopy); - CLONE_NODE_LIST_FIELD(pScanCols); - CLONE_NODE_LIST_FIELD(pScanPseudoCols); - COPY_SCALAR_FIELD(uid); - COPY_SCALAR_FIELD(suid); - COPY_SCALAR_FIELD(tableType); - COPY_OBJECT_FIELD(tableName, sizeof(SName)); - COPY_SCALAR_FIELD(groupOrderScan); + COPY_BASE_OBJECT_FIELD(scan, physiScanCopy); COPY_SCALAR_FIELD(onlyMetaCtbIdx); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 4dfc55c0fa..64a4e0e7d3 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1630,28 +1630,8 @@ static const char* jkTagScanPhysiOnlyMetaCtbIdx = "OnlyMetaCtbIdx"; static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) { const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj; - int32_t code = physicPlanNodeToJson(pObj, pJson); - if (TSDB_CODE_SUCCESS == code) { - code = nodeListToJson(pJson, jkScanPhysiPlanScanCols, pNode->pScanCols); - } - if (TSDB_CODE_SUCCESS == code) { - code = nodeListToJson(pJson, jkScanPhysiPlanScanPseudoCols, pNode->pScanPseudoCols); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanSTableId, pNode->suid); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddBoolToObject(pJson, jkScanPhysiPlanGroupOrderScan, pNode->groupOrderScan); - } + int32_t code = physiScanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkTagScanPhysiOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx); } @@ -1661,28 +1641,8 @@ static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj; - int32_t code = jsonToPhysicPlanNode(pJson, pObj); - if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeList(pJson, jkScanPhysiPlanScanCols, &pNode->pScanCols); - } - if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeList(pJson, jkScanPhysiPlanScanPseudoCols, &pNode->pScanPseudoCols); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanTableId, &pNode->uid); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanSTableId, &pNode->suid); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetBoolValue(pJson, jkScanPhysiPlanGroupOrderScan, &pNode->groupOrderScan); - } + int32_t code = jsonToPhysiScanNode(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkTagScanPhysiOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 4d1120861d..cade77fc17 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2004,42 +2004,15 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) { } enum { - PHY_TAG_SCAN_CODE_BASE_NODE = 1, - PHY_TAG_SCAN_CODE_SCAN_COLS, - PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS, - PHY_TAG_SCAN_CODE_BASE_UID, - PHY_TAG_SCAN_CODE_BASE_SUID, - PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE, - PHY_TAG_SCAN_CODE_BASE_TABLE_NAME, - PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN, + PHY_TAG_SCAN_CODE_SCAN = 1, PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX }; static int32_t physiTagScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj; - int32_t code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN_COLS, nodeListToMsg, pNode->pScanCols); - } - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS, nodeListToMsg, pNode->pScanPseudoCols); - } - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeU64(pEncoder, PHY_TAG_SCAN_CODE_BASE_UID, pNode->uid); - } - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeU64(pEncoder, PHY_TAG_SCAN_CODE_BASE_SUID, pNode->suid); - } - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI8(pEncoder, PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE, pNode->tableType); - } - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName); - } - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeBool(pEncoder, PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN, pNode->groupOrderScan); - } + int32_t code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan); + if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX, pNode->onlyMetaCtbIdx); } @@ -2053,29 +2026,8 @@ static int32_t msgToPhysiTagScanNode(STlvDecoder* pDecoder, void* pObj) { STlv* pTlv = NULL; tlvForEach(pDecoder, pTlv, code) { switch (pTlv->type) { - case PHY_TAG_SCAN_CODE_BASE_NODE: - code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); - break; - case PHY_TAG_SCAN_CODE_SCAN_COLS: - code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanCols); - break; - case PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS: - code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanPseudoCols); - break; - case PHY_TAG_SCAN_CODE_BASE_UID: - code = tlvDecodeU64(pTlv, &pNode->uid); - break; - case PHY_TAG_SCAN_CODE_BASE_SUID: - code = tlvDecodeU64(pTlv, &pNode->suid); - break; - case PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE: - code = tlvDecodeI8(pTlv, &pNode->tableType); - break; - case PHY_TAG_SCAN_CODE_BASE_TABLE_NAME: - code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName); - break; - case PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN: - code = tlvDecodeBool(pTlv, &pNode->groupOrderScan); + case PHY_TAG_SCAN_CODE_SCAN: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan); break; case PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX: code = tlvDecodeBool(pTlv, &pNode->onlyMetaCtbIdx); From 450d7e2d3c6cdebf2dc6fbf2a279666e45efd576 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 15 Aug 2023 16:56:15 +0800 Subject: [PATCH 15/16] enhance: compilation error --- source/libs/nodes/src/nodesCodeFuncs.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 64a4e0e7d3..48c9bf33dd 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1641,7 +1641,7 @@ static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj; - int32_t code = jsonToPhysiScanNode(pObj, pJson); + int32_t code = jsonToPhysiScanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkTagScanPhysiOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx); From 49e4b11547c03cfd7ed5dc993d2fa6e42bab9a8b Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 15 Aug 2023 17:00:53 +0800 Subject: [PATCH 16/16] fix: fix compilation error --- source/libs/executor/src/scanoperator.c | 2 +- source/libs/nodes/test/nodesCloneTest.cpp | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ef28875be4..d7d97cc514 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2991,7 +2991,7 @@ static void destroyTagScanOperatorInfo(void* param) { SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo) { - SScanPhysiNode* pPhyNode = (STagScanPhysiNode*)pTagScanNode; + SScanPhysiNode* pPhyNode = (SScanPhysiNode*)pTagScanNode; STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { diff --git a/source/libs/nodes/test/nodesCloneTest.cpp b/source/libs/nodes/test/nodesCloneTest.cpp index e1e99abab3..8b8893d317 100644 --- a/source/libs/nodes/test/nodesCloneTest.cpp +++ b/source/libs/nodes/test/nodesCloneTest.cpp @@ -199,9 +199,10 @@ TEST_F(NodesCloneTest, physiScan) { ASSERT_EQ(nodeType(pSrc), nodeType(pDst)); STagScanPhysiNode* pSrcNode = (STagScanPhysiNode*)pSrc; STagScanPhysiNode* pDstNode = (STagScanPhysiNode*)pDst; - ASSERT_EQ(pSrcNode->uid, pDstNode->uid); - ASSERT_EQ(pSrcNode->suid, pDstNode->suid); - ASSERT_EQ(pSrcNode->tableType, pDstNode->tableType); + ASSERT_EQ(pSrcNode->scan.uid, pDstNode->scan.uid); + ASSERT_EQ(pSrcNode->scan.suid, pDstNode->scan.suid); + ASSERT_EQ(pSrcNode->scan.tableType, pDstNode->scan.tableType); + ASSERT_EQ(pSrcNode->onlyMetaCtbIdx, pDstNode->onlyMetaCtbIdx); }); std::unique_ptr srcNode(nullptr, nodesDestroyNode);