From c2a7404b425e655bf01e54e159f79b96e430b047 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 25 Nov 2022 23:00:14 +0800 Subject: [PATCH] save work in case it losts --- source/dnode/vnode/inc/vnode.h | 2 + source/dnode/vnode/src/inc/vnodeInt.h | 2 - source/libs/executor/inc/executorimpl.h | 36 ++-- source/libs/executor/src/scanoperator.c | 227 ++++++++++++++++++++---- 4 files changed, 216 insertions(+), 51 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 52a8a7f5a3..791e7e88b0 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -108,6 +108,8 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableUidByName(void *meta, char *tbName, int64_t *uid); int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid); +tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name); +int64_t metaGetTbNum(SMeta* pMeta); typedef struct { int64_t uid; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1ca80c7570..1b15e55c0d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -116,8 +116,6 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaAlterCache(SMeta* pMeta, int32_t nPage); -tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name); -int64_t metaGetTbNum(SMeta* pMeta); int64_t metaGetTimeSeriesNum(SMeta* pMeta); SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock); void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1785553c39..a14c9bb51e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -530,16 +530,32 @@ typedef struct SSysTableScanInfo { SLoadRemoteDataInfo loadInfo; } SSysTableScanInfo; -typedef struct STableCountScanInfo { - SReadHandle readHandle; - SSDataBlock* pRes; - SExprSupp pseudoSup; - SNode* pCondition; - SName name; - uint64_t suid; - uint64_t uid; - int8_t tableType; -} STableCountScanInfo; +typedef struct STableCountScanSupp { + int16_t dbNameSlotId; + int16_t stbNameSlotId; + int16_t tbCountSlotId; + + bool groupByDbName; + bool groupByStbName; + char dbName[TSDB_DB_NAME_LEN]; + char stbName[TSDB_TABLE_NAME_LEN]; + +} STableCountScanSupp; + +typedef struct STableCountScanOperatorInfo { + SReadHandle readHandle; + SSDataBlock* pRes; + + SName tableName; + + SNodeList* groupTags; + SNodeList* scanCols; + SNodeList* pseudoCols; + + STableCountScanSupp supp; + + int32_t currGrpIdx; +} STableCountScanOperatorInfo; typedef struct SBlockDistInfo { SSDataBlock* pResBlock; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 941f7e7098..c8ff156cd3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -31,7 +31,6 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" -#include "vnode.h" #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -233,8 +232,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro STableScanInfo* pTableScanInfo = pOperator->info; - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf, - GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, + buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); if (p1 == NULL) { return NULL; @@ -376,7 +375,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; pCost->totalBlocks += 1; @@ -4428,7 +4427,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { uint32_t status = 0; loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); -// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); + // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -4758,7 +4757,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN goto _error; } - initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pResBlock = createResDataBlock(pDescNode); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); @@ -4792,15 +4790,106 @@ _error: // ==================================================================================================================== // TableCountScanOperator static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator); -static void destoryTableCountScanOperator(void* param); +static void destoryTableCountScanOperator(void* param); +static const char* GROUP_TAG_DB_NAME = "db_name"; +static const char* GROUP_TAG_STABLE_NAME = "stable_name"; + +int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) { + if (scanCols != NULL) { + SNode* pNode = NULL; + FOREACH(pNode, scanCols) { + if (nodeType(pNode) != QUERY_NODE_TARGET) { + return TSDB_CODE_QRY_SYS_ERROR; + } + STargetNode* targetNode = (STargetNode*)pNode; + if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) { + return TSDB_CODE_QRY_SYS_ERROR; + } + SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr); + if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) { + supp->dbNameSlotId = targetNode->slotId; + } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) { + supp->stbNameSlotId = targetNode->slotId; + } + } + } + return TSDB_CODE_SUCCESS; +} + +int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) { + if (pseudoCols != NULL) { + SNode* pNode = NULL; + FOREACH(pNode, pseudoCols) { + if (nodeType(pNode) != QUERY_NODE_TARGET) { + return TSDB_CODE_QRY_SYS_ERROR; + } + STargetNode* targetNode = (STargetNode*)pNode; + if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) { + return TSDB_CODE_QRY_SYS_ERROR; + } + SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr); + if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) { + supp->tbCountSlotId = targetNode->slotId; + } + } + } + return TSDB_CODE_SUCCESS; +} + +int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) { + if (groupTags != NULL) { + SNode* pNode = NULL; + FOREACH(pNode, groupTags) { + if (nodeType(pNode) != QUERY_NODE_COLUMN) { + return TSDB_CODE_QRY_SYS_ERROR; + } + SColumnNode* colNode = (SColumnNode*)pNode; + if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) { + supp->groupByDbName = true; + } + if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) { + supp->groupByStbName = true; + } + } + } else { + strncpy(supp->dbName, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN); + strncpy(supp->stbName, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN); + } + return TSDB_CODE_SUCCESS; +} + +int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols, + STableCountScanSupp* supp, SExecTaskInfo* taskInfo) { + int32_t code = 0; + code = tblCountScanGetInputs(groupTags, tableName, supp); + if (code != TSDB_CODE_SUCCESS) { + qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo)); + return code; + } + supp->dbNameSlotId = -1; + supp->stbNameSlotId = -1; + supp->tbCountSlotId = -1; + + code = tblCountScanGetGroupTagsSlotId(scanCols, supp); + if (code != TSDB_CODE_SUCCESS) { + qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo)); + return code; + } + code = tblCountScanGetCountSlotId(pseudoCols, supp); + if (code != TSDB_CODE_SUCCESS) { + qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo)); + return code; + } + return code; +} SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; - SScanPhysiNode* pScanNode = &pTblCountScanNode->scan; - STableCountScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SScanPhysiNode* pScanNode = &pTblCountScanNode->scan; + STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanSupp)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (!pInfo || !pOperator) { goto _error; @@ -4808,26 +4897,19 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC pInfo->readHandle = *readHandle; - if (pScanNode->pScanPseudoCols != NULL) { - SExprSupp* pSup = &pInfo->pseudoSup; - pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); - pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); - } - SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; initResultSizeInfo(&pOperator->resultInfo, 1); pInfo->pRes = createResDataBlock(pDescNode); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - tNameAssign(&pInfo->name, &pScanNode->tableName); - - pInfo->suid = pScanNode->suid; // 0 for super table, super table uid for child table - pInfo->uid = pScanNode->uid; // super table uid, or child table uid - pInfo->tableType = pScanNode->tableType; + getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName, + pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp, + pTaskInfo); setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL); return pOperator; _error: @@ -4840,32 +4922,99 @@ _error: } static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableCountScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableCountScanOperatorInfo* pInfo = pOperator->info; + STableCountScanSupp* pSupp = &pInfo->supp; + SSDataBlock* pRes = pInfo->pRes; + // compute group id must, but output is according to scancols. output group by group + // grouptags high priority(groupid<=>grouptag), then tablename(dbname,tableName). + // scanCols, (grouptags cols) + { + // mnode, query table count of information_schema and performance_schema + if (pInfo->readHandle.mnd != NULL) { + if (pSupp->groupByDbName) { + if (pInfo->currGrpIdx == 0) { + uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB)); + size_t infodbTableNum; + getInfosDbMeta(NULL, &infodbTableNum); + pRes->info.groupId = groupId; + if (pSupp->dbNameSlotId != -1) { + SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId); + //colDataAppend(colInfoData, 0, ) + } + return NULL; + } else if (pInfo->currGrpIdx == 1) { + uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB)); + size_t perfdbTableNum; + getPerfDbMeta(NULL, &perfdbTableNum); + } else if (pInfo->currGrpIdx >= 2) { + return NULL; + } + uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB)); + size_t infodbTableNum; + getInfosDbMeta(NULL, &infodbTableNum); + size_t perfdbTableNum; + getPerfDbMeta(NULL, &perfdbTableNum); + // grouptags, db name + } else { + } + } + return NULL; + } + const char* db = NULL; + int32_t vgId = 0; + { + // get dbname + vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId); + SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, 0); + SName sn = {0}; + char varDbName[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB); - SExprSupp* pSup = &pInfo->pseudoSup; - if (pSup->numOfExprs != 1 || pSup->pExprInfo[0].pExpr->nodeType != QUERY_NODE_FUNCTION || - pSup->pExprInfo[0].pExpr->_function.functionType != FUNCTION_TYPE_TABLE_COUNT ) { - qError("%s table count scan operator invalid pseduo columns", GET_TASKID(pTaskInfo)); + tNameGetDbName(&sn, varDataVal(varDbName)); + varDataSetLen(varDbName, strlen(varDataVal(varDbName))); } - SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, 0); - if (pInfo->uid != 0) { + const char* tableName = tNameGetTableName(&pInfo->tableName); + + { + // grouptags only have column db_name or (no grouptags and tablename is null) + // if (tableName == NULL | strlen(tableName) == 0) + metaGetTbNum(pInfo->readHandle.meta); + } + {// no grouptags and TableName is not null, return child table count + {tb_uid_t uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, tableName); + SMetaStbStats stats = {0}; + metaGetStbStats(pInfo->readHandle.meta, uid, &stats); + int64_t ctbNum = stats.ctbNum; +} +} +{ + // grouptags have column stable_name. return (stable name, child table count) + SArray* stbUidList = taosArrayInit(16, sizeof(tb_uid_t)); + vnodeGetStbIdList(pInfo->readHandle.vnode, 0, stbUidList); + if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, stbUidList) < 0) { + qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr()); + taosArrayDestroy(stbUidList); + // return failure + } + for (int i = 0; i < taosArrayGetSize(stbUidList); ++i) { + tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(stbUidList, i); SMetaStbStats stats = {0}; - metaGetStbStats(pInfo->readHandle.meta, pInfo->uid, &stats); + metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats); int64_t ctbNum = stats.ctbNum; - //TODO: wxy about the return type bigint or int? - colDataAppend(pColInfoData, 0, (char*)&ctbNum, false); - pInfo->pRes->info.rows = 1; - } else { - //TODO: get table count in this vnode? + + char varStbName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + metaGetTableNameByUid(pInfo->readHandle.meta, stbUid, varStbName); } - return NULL; + taosArrayDestroy(stbUidList); +} +return NULL; } static void destoryTableCountScanOperator(void* param) { - STableCountScanInfo* pTableCountScanInfo = param; + STableCountScanOperatorInfo* pTableCountScanInfo = param; blockDataDestroy(pTableCountScanInfo->pRes); - cleanupExprSupp(&pTableCountScanInfo->pseudoSup); + nodesDestroyList(pTableCountScanInfo->groupTags); taosMemoryFreeClear(param); }