From 3936128742e8f468e99c6d7119d3ebe5ee561159 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 14 Dec 2021 07:37:54 -0500 Subject: [PATCH 1/9] TD-12034 Physical plan code. --- include/libs/planner/planner.h | 1 - source/libs/planner/inc/plannerInt.h | 38 ++++++------ source/libs/planner/src/physicalPlan.c | 80 +++++++++++++++++++++++--- source/libs/planner/src/planner.c | 31 +++------- 4 files changed, 101 insertions(+), 49 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 8f217a0deb..be00ed65f0 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -54,7 +54,6 @@ enum OPERATOR_TYPE_E { }; struct SEpSet; -struct SQueryPlanNode; struct SPhyNode; struct SQueryStmtInfo; diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 2231c93362..6dcd19782c 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -25,18 +25,20 @@ extern "C" { #include "planner.h" #include "taosmsg.h" -enum LOGIC_PLAN_E { - LP_SCAN = 1, - LP_SESSION = 2, - LP_STATE = 3, - LP_INTERVAL = 4, - LP_FILL = 5, - LP_AGG = 6, - LP_JOIN = 7, - LP_PROJECT = 8, - LP_DISTINCT = 9, - LP_ORDER = 10 -}; +#define QNODE_TAGSCAN 1 +#define QNODE_TABLESCAN 2 +#define QNODE_PROJECT 3 +#define QNODE_AGGREGATE 4 +#define QNODE_GROUPBY 5 +#define QNODE_LIMIT 6 +#define QNODE_JOIN 7 +#define QNODE_DISTINCT 8 +#define QNODE_SORT 9 +#define QNODE_UNION 10 +#define QNODE_TIMEWINDOW 11 +#define QNODE_SESSIONWINDOW 12 +#define QNODE_STATEWINDOW 13 +#define QNODE_FILL 14 typedef struct SQueryNodeBasicInfo { int32_t type; // operator type @@ -64,10 +66,10 @@ typedef struct SQueryPlanNode { SArray *pExpr; // the query functions or sql aggregations int32_t numOfExpr; // number of result columns, which is also the number of pExprs void *pExtInfo; // additional information - // previous operator to generated result for current node to process + // children operator to generated result for current node to process // in case of join, multiple prev nodes exist. - SArray *pPrevNodes; // upstream nodes - struct SQueryPlanNode *nextNode; + SArray *pChildren; // upstream nodes + struct SQueryPlanNode *pParent; } SQueryPlanNode; typedef SSchema SSlotSchema; @@ -86,11 +88,13 @@ typedef struct SPhyNode { // children plan to generated result for current node to process // in case of join, multiple plan nodes exist. SArray *pChildren; + struct SPhyNode *pParent; } SPhyNode; typedef struct SScanPhyNode { - SPhyNode node; - uint64_t uid; // unique id of the table + SPhyNode node; + STimeWindow window; + uint64_t uid; // unique id of the table } SScanPhyNode; typedef SScanPhyNode STagScanPhyNode; diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 2bdc159af8..c4564a9a09 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -15,20 +15,84 @@ #include "plannerInt.h" +// typedef struct SQueryPlanNode { +// void *pExtInfo; // additional information +// SArray *pPrevNodes; // children +// struct SQueryPlanNode *nextNode; // parent +// } SQueryPlanNode; + +// typedef struct SSubplan { +// int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN +// SArray *pDatasource; // the datasource subplan,from which to fetch the result +// struct SPhyNode *pNode; // physical plan of current subplan +// } SSubplan; + +// typedef struct SQueryDag { +// SArray **pSubplans; +// } SQueryDag; + +// typedef struct SScanPhyNode { +// SPhyNode node; +// STimeWindow window; +// uint64_t uid; // unique id of the table +// } SScanPhyNode; + +// typedef SScanPhyNode STagScanPhyNode; + +void fillDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) { + dataBlockSchema->index = 0; // todo + SWAP(dataBlockSchema->pSchema, pPlanNode->pSchema, SSchema*); + dataBlockSchema->numOfCols = pPlanNode->numOfCols; +} + +void fillPhyNode(SQueryPlanNode* pPlanNode, int32_t type, const char* name, SPhyNode* node) { + node->info.type = type; + node->info.name = name; + SWAP(node->pTargets, pPlanNode->pExpr, SArray*); + fillDataBlockSchema(pPlanNode, &(node->targetSchema)); +} + +SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) { + STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode)); + fillPhyNode(pPlanNode, OP_TagScan, "TagScan", (SPhyNode*)node); + return (SPhyNode*)node; +} + SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) { - return NULL; + STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode)); + fillPhyNode(pPlanNode, OP_TableScan, "SingleTableScan", (SPhyNode*)node); + return (SPhyNode*)node; } -SPhyNode* createPhyNode(SQueryPlanNode* node) { - switch (node->info.type) { - case LP_SCAN: - return createScanNode(node); +SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) { + SPhyNode* node = NULL; + switch (pPlanNode->info.type) { + case QNODE_TAGSCAN: + node = createTagScanNode(pPlanNode); + break; + case QNODE_TABLESCAN: + node = createScanNode(pPlanNode); + break; + default: + assert(false); } - return NULL; + if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) { + node->pChildren = taosArrayInit(4, POINTER_BYTES); + size_t size = taosArrayGetSize(pPlanNode->pChildren); + for(int32_t i = 0; i < size; ++i) { + SPhyNode* child = createPhyNode(taosArrayGet(pPlanNode->pChildren, i)); + child->pParent = node; + taosArrayPush(node->pChildren, &child); + } + } + return node; } -SPhyNode* createSubplan(SQueryPlanNode* pSubquery) { - return NULL; +SSubplan* createSubplan(SQueryPlanNode* pSubquery) { + SSubplan* subplan = calloc(1, sizeof(SSubplan)); + subplan->pNode = createPhyNode(pSubquery); + // todo + return subplan; } int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index e54b847230..b18dd29257 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -18,21 +18,6 @@ #include "parser.h" #include "plannerInt.h" -#define QNODE_TAGSCAN 1 -#define QNODE_TABLESCAN 2 -#define QNODE_PROJECT 3 -#define QNODE_AGGREGATE 4 -#define QNODE_GROUPBY 5 -#define QNODE_LIMIT 6 -#define QNODE_JOIN 7 -#define QNODE_DISTINCT 8 -#define QNODE_SORT 9 -#define QNODE_UNION 10 -#define QNODE_TIMEWINDOW 11 -#define QNODE_SESSIONWINDOW 12 -#define QNODE_STATEWINDOW 13 -#define QNODE_FILL 14 - typedef struct SFillEssInfo { int32_t fillType; // fill type int64_t *val; // fill value @@ -104,9 +89,9 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla taosArrayPush(pNode->pExpr, &pExpr[i]); } - pNode->pPrevNodes = taosArrayInit(4, POINTER_BYTES); + pNode->pChildren = taosArrayInit(4, POINTER_BYTES); for(int32_t i = 0; i < numOfPrev; ++i) { - taosArrayPush(pNode->pPrevNodes, &prev[i]); + taosArrayPush(pNode->pChildren, &prev[i]); } switch(type) { @@ -386,14 +371,14 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { tfree(pQueryNode->info.name); // dropAllExprInfo(pQueryNode->pExpr); - if (pQueryNode->pPrevNodes != NULL) { - int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pPrevNodes); + if (pQueryNode->pChildren != NULL) { + int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren); for(int32_t i = 0; i < size; ++i) { - SQueryPlanNode* p = taosArrayGetP(pQueryNode->pPrevNodes, i); + SQueryPlanNode* p = taosArrayGetP(pQueryNode->pChildren, i); doDestroyQueryNode(p); } - taosArrayDestroy(pQueryNode->pPrevNodes); + taosArrayDestroy(pQueryNode->pChildren); } tfree(pQueryNode); @@ -607,8 +592,8 @@ int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) { int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen); - for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pPrevNodes); ++i) { - SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pPrevNodes, i); + for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pChildren); ++i) { + SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pChildren, i); int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len); len = len1; } From c7a7939f6896d04417b57339aae6fb2d56fa8988 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 15 Dec 2021 19:55:50 +0800 Subject: [PATCH 2/9] add tablemeta cache --- include/common/taosmsg.h | 3 +- include/libs/catalog/catalog.h | 56 +--- include/libs/query/query.h | 46 +++ source/libs/catalog/inc/catalogInt.h | 19 +- source/libs/catalog/src/catalog.c | 421 ++++++++++++++++++++++----- source/libs/parser/src/parserUtil.c | 23 -- source/libs/query/src/querymsg.c | 130 ++++++++- 7 files changed, 557 insertions(+), 141 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index caf872689c..fd6c0620e8 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -839,7 +839,7 @@ typedef struct { int32_t tversion; uint64_t tuid; uint64_t suid; - SVgroupMsg vgroup; + int32_t vgId; SSchema pSchema[]; } STableMetaMsg; @@ -867,6 +867,7 @@ typedef struct { int32_t dbVgroupVersion; int32_t dbVgroupNum; int32_t dbHashRange; + int32_t dbHashType; SVgroupInfo vgroupInfo[]; //int32_t vgIdList[]; } SUseDbRspMsg; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index f9d3b3c8c1..38965c6ba9 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -45,42 +45,10 @@ typedef struct SMetaData { SEpSet *pEpSet; // qnode epset list } SMetaData; -typedef struct STableComInfo { - uint8_t numOfTags; // the number of tags in schema - uint8_t precision; // the number of precision - int16_t numOfColumns; // the number of columns - int32_t rowSize; // row size of the schema -} STableComInfo; - -/* - * ASSERT(sizeof(SCTableMeta) == 24) - * ASSERT(tableType == TSDB_CHILD_TABLE) - * The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info. - */ -typedef struct SCTableMeta { - int32_t vgId:24; - int8_t tableType; - uint64_t uid; - uint64_t suid; -} SCTableMeta; - -/* - * Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta. - */ -typedef struct STableMeta { - int32_t vgId:24; - int8_t tableType; - uint64_t uid; - uint64_t suid; - // if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info - int16_t sversion; - int16_t tversion; - STableComInfo tableInfo; - SSchema schema[]; -} STableMeta; - typedef struct SCatalogCfg { - + bool enableVgroupCache; + uint32_t maxTblCacheNum; + uint32_t maxDBCacheNum; } SCatalogCfg; int32_t catalogInit(SCatalogCfg *cfg); @@ -96,19 +64,25 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); -int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList); -int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup); + +/** + * get cluster vgroup list. + * @pVgroupList - hash of vgroup list, key:vgId, value:SVgroupInfo + * @return + */ +int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash); +int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup); int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo); -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); +int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta); -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta); +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta); /** diff --git a/include/libs/query/query.h b/include/libs/query/query.h index 02ae708874..bfe2db6a61 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -24,10 +24,49 @@ extern "C" { typedef SVgroupListRspMsg SVgroupListInfo; +typedef struct STableComInfo { + uint8_t numOfTags; // the number of tags in schema + uint8_t precision; // the number of precision + int16_t numOfColumns; // the number of columns + int32_t rowSize; // row size of the schema +} STableComInfo; + +/* + * ASSERT(sizeof(SCTableMeta) == 24) + * ASSERT(tableType == TSDB_CHILD_TABLE) + * The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info. + */ +typedef struct SCTableMeta { + int32_t vgId:24; + int8_t tableType; + uint64_t uid; + uint64_t suid; +} SCTableMeta; + +/* + * Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta. + */ +typedef struct STableMeta { + //BEGIN: KEEP THIS PART SAME WITH SCTableMeta + int32_t vgId:24; + int8_t tableType; + uint64_t uid; + uint64_t suid; + //END: KEEP THIS PART SAME WITH SCTableMeta + + // if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info + int16_t sversion; + int16_t tversion; + STableComInfo tableInfo; + SSchema schema[]; +} STableMeta; + + typedef struct SDBVgroupInfo { int32_t vgroupVersion; SArray *vgId; int32_t hashRange; + int32_t hashType; } SDBVgroupInfo; typedef struct SUseDbOutput { @@ -36,6 +75,13 @@ typedef struct SUseDbOutput { SDBVgroupInfo *dbVgroup; } SUseDbOutput; +typedef struct STableMetaOutput { + int32_t metaNum; + char ctbFname[TSDB_TABLE_FNAME_LEN]; + char tbFname[TSDB_TABLE_FNAME_LEN]; + SCTableMeta ctbMeta; + STableMeta *tbMeta; +} STableMetaOutput; extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index a08b64f9a9..455c82b1bc 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -24,16 +24,16 @@ extern "C" { #include "common.h" #include "tlog.h" -#define CTG_DEFAULT_CLUSTER_NUMBER 6 -#define CTG_DEFAULT_VGROUP_NUMBER 100 -#define CTG_DEFAULT_DB_NUMBER 20 +#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6 +#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 +#define CTG_DEFAULT_CACHE_DB_NUMBER 20 +#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000 #define CTG_DEFAULT_INVALID_VERSION (-1) typedef struct SVgroupListCache { int32_t vgroupVersion; - SHashObj *cache; // key:vgId, value:SVgroupInfo* - SArray *arrayCache; // SVgroupInfo + SHashObj *cache; // key:vgId, value:SVgroupInfo } SVgroupListCache; typedef struct SDBVgroupCache { @@ -41,20 +41,23 @@ typedef struct SDBVgroupCache { } SDBVgroupCache; typedef struct STableMetaCache { - SHashObj *cache; //key:fulltablename, value:STableMeta + SHashObj *cache; //key:fulltablename, value:STableMeta + SHashObj *stableCache; //key:suid, value:STableMeta* } STableMetaCache; typedef struct SCatalog { SVgroupListCache vgroupCache; - SDBVgroupCache dbCache; - STableMetaCache tableCache; + SDBVgroupCache dbCache; + STableMetaCache tableCache; } SCatalog; typedef struct SCatalogMgmt { void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node + SCatalogCfg cfg; } SCatalogMgmt; +typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); extern int32_t ctgDebugFlag; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 92b6094529..9921a2696b 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -47,14 +47,14 @@ int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSe return TSDB_CODE_SUCCESS; } -int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* exist) { - if (NULL == pCatalog->vgroupCache.arrayCache || pCatalog->vgroupCache.vgroupVersion < 0) { +int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) { + if (NULL == pCatalog->vgroupCache.cache || pCatalog->vgroupCache.vgroupVersion < 0) { *exist = 0; return TSDB_CODE_SUCCESS; } if (pVgroupList) { - *pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache); + *pVgroupList = pCatalog->vgroupCache.cache; } *exist = 1; @@ -62,7 +62,6 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* return TSDB_CODE_SUCCESS; } - int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { if (NULL == pCatalog->dbCache.cache) { *exist = 0; @@ -92,6 +91,7 @@ int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgrou (*dbInfo)->vgroupVersion = info->vgroupVersion; (*dbInfo)->hashRange = info->hashRange; + (*dbInfo)->hashType = info->hashType; } *exist = 1; @@ -130,17 +130,328 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp } -int32_t catalogInit(SCatalogCfg *cfg) { - ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == ctgMgmt.pCluster) { - CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); +int32_t ctgGetTableMetaFromCache(SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) { + if (NULL == pCatalog->tableCache.cache) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } + + char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1]; + + snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName); + + STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName)); + + if (NULL == tbMeta) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } + + if (tbMeta->tableType == TSDB_CHILD_TABLE) { + STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid)); + if (NULL == stbMeta || NULL == *stbMeta) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } + + if ((*stbMeta)->suid != tbMeta->suid) { + ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema); + *pTableMeta = calloc(1, metaSize); + if (NULL == *pTableMeta) { + ctgError("calloc size[%d] failed", metaSize); + return TSDB_CODE_CTG_MEM_ERROR; + } + + memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta)); + memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta)); + } else { + int32_t metaSize = sizeof(STableMeta) + (tbMeta->tableInfo.numOfTags + tbMeta->tableInfo.numOfColumns) * sizeof(SSchema); + *pTableMeta = calloc(1, metaSize); + if (NULL == *pTableMeta) { + ctgError("calloc size[%d] failed", metaSize); + return TSDB_CODE_CTG_MEM_ERROR; + } + + memcpy(*pTableMeta, tbMeta, metaSize); + } + + *exist = 1; + + return TSDB_CODE_SUCCESS; +} + +void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { + epSet->inUse = 0; + epSet->numOfEps = vgroupInfo->numOfEps; + + for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) { + memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i])); + memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i])); + } +} + + +int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1]; + + snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); + + SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName}; + char *msg = NULL; + SEpSet *pVnodeEpSet = NULL; + int32_t msgLen = 0; + + int32_t code = queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); + if (code) { + return code; + } + + SRpcMsg rpcMsg = { + .msgType = TSDB_MSG_TYPE_TABLE_META, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + SEpSet epSet; + + ctgGenEpSet(&epSet, vgroupInfo); + + rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp); + + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("get table meta from mnode failed, error code:%d", rpcRsp.code); + return rpcRsp.code; + } + + code = queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META](output, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + return code; } return TSDB_CODE_SUCCESS; } -int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) { +int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) { + switch (hashType) { + default: + *fp = MurmurHash3_32; + break; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { + SDBVgroupInfo *dbInfo = NULL; + int32_t code = 0; + + CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo)); + + if (NULL == dbInfo) { + ctgWarn("db[%s] vgroup info not found", pDBName); + return TSDB_CODE_TSC_DB_NOT_SELECTED; + } + + if (dbInfo->vgroupVersion < 0 || NULL == dbInfo->vgId) { + ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgId:%p", pDBName, dbInfo->vgroupVersion, dbInfo->vgId); + CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED); + } + + int32_t vgNum = taosArrayGetSize(dbInfo->vgId); + if (vgNum <= 0) { + ctgError("db[%s] vgroup cache invalid, vgroup number:%p", vgNum); + CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED); + } + + tableNameHashFp fp = NULL; + + CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashType, &fp)); + + char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1]; + + snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); + + uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName)); + uint32_t hashUnit = dbInfo->hashRange / vgNum; + uint32_t vgId = hashValue / hashUnit; + + SHashObj *vgroupHash = NULL; + + CTG_ERR_JRET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash)); + if (NULL == vgroupHash) { + ctgError("get empty vgroup cache"); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) { + ctgError("vgId[%d] not found in vgroup list", vgId); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + +_return: + if (dbInfo && dbInfo->vgId) { + taosArrayDestroy(dbInfo->vgId); + dbInfo->vgId = NULL; + } + + tfree(dbInfo); + + return code; +} + + + +STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) { + assert(pChild != NULL); + int32_t total = pChild->numOfColumns + pChild->numOfTags; + + STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total); + pTableMeta->tableType = TSDB_SUPER_TABLE; + pTableMeta->tableInfo.numOfTags = pChild->numOfTags; + pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns; + pTableMeta->tableInfo.precision = pChild->precision; + + pTableMeta->uid = pChild->suid; + pTableMeta->tversion = pChild->tversion; + pTableMeta->sversion = pChild->sversion; + + memcpy(pTableMeta->schema, pChild->pSchema, sizeof(SSchema) * total); + + int32_t num = pTableMeta->tableInfo.numOfColumns; + for(int32_t i = 0; i < num; ++i) { + pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; + } + + return pTableMeta; +} + +int32_t ctgGetTableMetaImpl(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) { + if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + int32_t exist = 0; + + if (!forceUpdate) { + CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist)); + + if (exist) { + return TSDB_CODE_SUCCESS; + } + } + + CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName)); + + CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist)); + + if (0 == exist) { + ctgError("get table meta from cache failed, but fetch succeed"); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgUpdateTableMetaCache(SCatalog *pCatalog, STableMetaOutput *output) { + if (output->metaNum != 1 && output->metaNum != 2) { + ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + if (NULL == output->tbMeta) { + ctgError("no valid table meta got from meta rsp"); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + if (NULL == pCatalog->tableCache.cache) { + pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->tableCache.cache) { + ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); + return TSDB_CODE_CTG_MEM_ERROR; + } + } + + if (NULL == pCatalog->tableCache.cache) { + pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->tableCache.cache) { + ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); + return TSDB_CODE_CTG_MEM_ERROR; + } + + pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->tableCache.stableCache) { + ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); + return TSDB_CODE_CTG_MEM_ERROR; + } + } + + if (output->metaNum == 2) { + if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { + ctgError("push ctable[%s] to table cache failed", output->ctbFname); + goto error_exit; + } + + if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { + ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); + goto error_exit; + } + } + + if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, sizeof(*output->tbMeta)) != 0) { + ctgError("push table[%s] to table cache failed", output->tbFname); + goto error_exit; + } + + if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { + if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) { + ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); + goto error_exit; + } + } + + return TSDB_CODE_SUCCESS; + +error_exit: + if (pCatalog->vgroupCache.cache) { + taosHashCleanup(pCatalog->vgroupCache.cache); + pCatalog->vgroupCache.cache = NULL; + } + + pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; + + return TSDB_CODE_CTG_INTERNAL_ERROR; +} + + +int32_t catalogInit(SCatalogCfg *cfg) { + ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == ctgMgmt.pCluster) { + CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); + } + + if (cfg) { + memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg)); + } else { + ctgMgmt.cfg.enableVgroupCache = true; + ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; + ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t catalogGetHandle(const char *clusterId, SCatalog** catalogHandle) { if (NULL == clusterId || NULL == catalogHandle) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -190,7 +501,7 @@ int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) { -int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) { +int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) { if (NULL == pVgroup) { ctgError("no valid vgroup list info to update"); return TSDB_CODE_CTG_INTERNAL_ERROR; @@ -200,22 +511,11 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) ctgError("vgroup version[%d] is invalid", pVgroup->vgroupVersion); return TSDB_CODE_CTG_INVALID_INPUT; } - - - if (NULL == pCatalog->vgroupCache.arrayCache) { - pCatalog->vgroupCache.arrayCache = taosArrayInit(pVgroup->vgroupNum, sizeof(pVgroup->vgroupInfo[0])); - if (NULL == pCatalog->vgroupCache.arrayCache) { - ctgError("init array[%d] for cluster cache failed", pVgroup->vgroupNum); - return TSDB_CODE_CTG_MEM_ERROR; - } - } else { - taosArrayClear(pCatalog->vgroupCache.arrayCache); - } if (NULL == pCatalog->vgroupCache.cache) { - pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_CACHE_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->vgroupCache.cache) { - ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_VGROUP_NUMBER); + ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_CACHE_VGROUP_NUMBER); return TSDB_CODE_CTG_MEM_ERROR; } } else { @@ -224,13 +524,7 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) SVgroupInfo *vInfo = NULL; for (int32_t i = 0; i < pVgroup->vgroupNum; ++i) { - vInfo = taosArrayPush(pCatalog->vgroupCache.arrayCache, &pVgroup->vgroupInfo[i]); - if (NULL == vInfo) { - ctgError("push to vgroup array cache failed"); - goto error_exit; - } - - if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &vInfo, POINTER_BYTES) != 0) { + if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &pVgroup->vgroupInfo[i], sizeof(pVgroup->vgroupInfo[i])) != 0) { ctgError("push to vgroup hash cache failed"); goto error_exit; } @@ -241,11 +535,6 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) return TSDB_CODE_SUCCESS; error_exit: - if (pCatalog->vgroupCache.arrayCache) { - taosArrayDestroy(pCatalog->vgroupCache.arrayCache); - pCatalog->vgroupCache.arrayCache = NULL; - } - if (pCatalog->vgroupCache.cache) { taosHashCleanup(pCatalog->vgroupCache.cache); pCatalog->vgroupCache.cache = NULL; @@ -256,15 +545,14 @@ error_exit: return TSDB_CODE_CTG_INTERNAL_ERROR; } - -int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList) { +int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) { if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) { return TSDB_CODE_CTG_INVALID_INPUT; } int32_t exist = 0; - CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist)); + CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist)); if (exist) { return TSDB_CODE_SUCCESS; @@ -274,10 +562,10 @@ int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, CTG_ERR_RET(ctgGetVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &pVgroup)); - CTG_ERR_RET(catalogUpdateVgroup(pCatalog, pVgroup)); + CTG_ERR_RET(catalogUpdateVgroupCache(pCatalog, pVgroup)); - if (pVgroupList) { - CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist)); + if (pVgroupHash) { + CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist)); } if (0 == exist) { @@ -288,6 +576,7 @@ int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, return TSDB_CODE_SUCCESS; } + int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { if (NULL == pCatalog || NULL == dbName || NULL == version) { return TSDB_CODE_CTG_INVALID_INPUT; @@ -309,7 +598,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { +int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -324,9 +613,9 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB } if (NULL == pCatalog->dbCache.cache) { - pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_CACHE_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->dbCache.cache) { - ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_DB_NUMBER); + ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); return TSDB_CODE_CTG_MEM_ERROR; } } @@ -369,11 +658,11 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); if (DbOut.vgroupList) { - CTG_ERR_JRET(catalogUpdateVgroup(pCatalog, DbOut.vgroupList)); + CTG_ERR_JRET(catalogUpdateVgroupCache(pCatalog, DbOut.vgroupList)); } if (DbOut.dbVgroup) { - CTG_ERR_JRET(catalogUpdateDBVgroup(pCatalog, dbName, DbOut.dbVgroup)); + CTG_ERR_JRET(catalogUpdateDBVgroupCache(pCatalog, dbName, DbOut.dbVgroup)); } if (dbInfo) { @@ -388,37 +677,35 @@ _return: return code; } +int32_t catalogGetTableMeta(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) { + return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, false, pTableMeta); +} - -int32_t catalogGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) { - if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) { + if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) { return TSDB_CODE_CTG_INVALID_INPUT; } - SBuildTableMetaInput bInput = {0}; - char *msg = NULL; - SEpSet *pVnodeEpSet = NULL; - int32_t msgLen = 0; + SVgroupInfo vgroupInfo = {0}; + + CTG_ERR_RET(ctgGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo)); - int32_t code = queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); - if (code) { - return code; - } + STableMetaOutput output = {0}; + + CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output)); - SRpcMsg rpcMsg = { - .msgType = TSDB_MSG_TYPE_TABLE_META, - .pCont = msg, - .contLen = msgLen, - }; - - SRpcMsg rpcRsp = {0}; - - rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output)); + tfree(output.tbMeta); + return TSDB_CODE_SUCCESS; } -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) { +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta) { + return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta); +} + +int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList) { } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 28d01b9e66..7fcb7ea304 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1464,29 +1464,6 @@ int32_t copyTagData(STagData* dst, const STagData* src) { return 0; } -STableMeta* createSuperTableMeta(STableMetaMsg* pChild) { - assert(pChild != NULL); - int32_t total = pChild->numOfColumns + pChild->numOfTags; - - STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total); - pTableMeta->tableType = TSDB_SUPER_TABLE; - pTableMeta->tableInfo.numOfTags = pChild->numOfTags; - pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns; - pTableMeta->tableInfo.precision = pChild->precision; - - pTableMeta->uid = pChild->suid; - pTableMeta->tversion = pChild->tversion; - pTableMeta->sversion = pChild->sversion; - - memcpy(pTableMeta->schema, pChild->pSchema, sizeof(SSchema) * total); - - int32_t num = pTableMeta->tableInfo.numOfColumns; - for(int32_t i = 0; i < num; ++i) { - pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; - } - - return pTableMeta; -} uint32_t getTableMetaSize(const STableMeta* pTableMeta) { assert(pTableMeta != NULL); diff --git a/source/libs/query/src/querymsg.c b/source/libs/query/src/querymsg.c index 8f35fd9c3e..c5864fd41b 100644 --- a/source/libs/query/src/querymsg.c +++ b/source/libs/query/src/querymsg.c @@ -222,6 +222,13 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { pOut->dbVgroup->vgroupVersion = pRsp->dbVgroupVersion; pOut->dbVgroup->hashRange = htonl(pRsp->dbHashRange); + pOut->dbVgroup->hashType = htonl(pRsp->dbHashType); + + if (pOut->dbVgroup->hashRange < 0) { + qError("invalid hashRange[%d] for db[%s]", pOut->dbVgroup->hashRange, pRsp->db); + code = TSDB_CODE_TSC_INVALID_INPUT; + goto _exit; + } for (int32_t i = 0; i < pRsp->dbVgroupNum; ++i) { *(vgIdList + i) = htonl(*(vgIdList + i)); @@ -244,13 +251,134 @@ _exit: return code; } +static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) { + pMetaMsg->numOfTags = htonl(pMetaMsg->numOfTags); + pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns); + pMetaMsg->sversion = htonl(pMetaMsg->sversion); + pMetaMsg->tversion = htonl(pMetaMsg->tversion); + pMetaMsg->tuid = htobe64(pMetaMsg->tuid); + pMetaMsg->suid = htobe64(pMetaMsg->suid); + pMetaMsg->vgId = htonl(pMetaMsg->vgId); + + if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) { + qError("invalid numOfTags[%d] in table meta rsp msg", pMetaMsg->numOfTags); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) { + qError("invalid numOfColumns[%d] in table meta rsp msg", pMetaMsg->numOfColumns); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE && pMetaMsg->tableType != TSDB_NORMAL_TABLE) { + qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pMetaMsg->sversion < 0) { + qError("invalid sversion[%d] in table meta rsp msg", pMetaMsg->sversion); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pMetaMsg->tversion < 0) { + qError("invalid tversion[%d] in table meta rsp msg", pMetaMsg->tversion); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + SSchema* pSchema = pMetaMsg->pSchema; + + int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags; + for (int i = 0; i < numOfTotalCols; ++i) { + pSchema->bytes = htonl(pSchema->bytes); + pSchema->colId = htonl(pSchema->colId); + + pSchema++; + } + + if (pMetaMsg->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) { + qError("invalid colId[%d] for the first column in table meta rsp msg", pMetaMsg->pSchema[0].colId); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STableMeta **pMeta) { + int32_t total = msg->numOfColumns + msg->numOfTags; + int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total; + + STableMeta* pTableMeta = calloc(1, metaSize); + if (NULL == pTableMeta) { + qError("calloc size[%d] failed", metaSize); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType; + pTableMeta->uid = msg->suid; + pTableMeta->suid = msg->suid; + pTableMeta->sversion = msg->sversion; + pTableMeta->tversion = msg->tversion; + + pTableMeta->tableInfo.numOfTags = msg->numOfTags; + pTableMeta->tableInfo.precision = msg->precision; + pTableMeta->tableInfo.numOfColumns = msg->numOfColumns; + + for(int32_t i = 0; i < msg->numOfColumns; ++i) { + pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; + } + + memcpy(pTableMeta->schema, msg->pSchema, sizeof(SSchema) * total); + + *pMeta = pTableMeta; + + return TSDB_CODE_SUCCESS; +} + + +int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { + STableMetaMsg *pMetaMsg = (STableMetaMsg *)msg; + int32_t code = queryConvertTableMetaMsg(pMetaMsg); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + STableMetaOutput *pOut = (STableMetaOutput *)output; + + if (!tIsValidSchema(pMetaMsg->pSchema, pMetaMsg->numOfColumns, pMetaMsg->numOfTags)) { + qError("validate table meta schema in rsp msg failed"); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pMetaMsg->tableType == TSDB_CHILD_TABLE) { + pOut->metaNum = 2; + + memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname)); + memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname)); + + pOut->ctbMeta.vgId = pMetaMsg->vgId; + pOut->ctbMeta.tableType = pMetaMsg->tableType; + pOut->ctbMeta.uid = pMetaMsg->tuid; + pOut->ctbMeta.suid = pMetaMsg->suid; + + code = queryCreateTableMetaFromMsg(pMetaMsg, true, &pOut->tbMeta); + } else { + pOut->metaNum = 1; + + memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname)); + + code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta); + } + + return code; +} + void msgInit() { queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg; queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg; queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg; - //tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp; + queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = queryProcessTableMetaRsp; queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp; queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp; From 26a986022631f315cf39f3f7fc50d096aa547bab Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 15 Dec 2021 23:24:03 +0800 Subject: [PATCH 3/9] fix search bug --- source/libs/index/inc/index_fst.h | 4 ++ source/libs/index/src/index_fst.c | 4 ++ source/libs/index/test/indexTests.cpp | 55 +++++++++++++++++++++++++-- 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index a1d4962e8b..20037f829a 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -30,6 +30,7 @@ extern "C" { typedef struct Fst Fst; typedef struct FstNode FstNode; +typedef struct StreamWithState StreamWithState; typedef enum { Included, Excluded, Unbounded} FstBound; @@ -283,6 +284,9 @@ Output fstEmptyFinalOutput(Fst *fst, bool *null); FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx); FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx); +// into stream to expand later +StreamWithState* streamBuilderIntoStream(FstStreamBuilder *sb); + bool fstVerify(Fst *fst); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 37bdcb0ecf..54058489c2 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1094,6 +1094,10 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) { FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx) { return fstStreamBuilderCreate(fst, ctx); } +StreamWithState* streamBuilderIntoStream(FstStreamBuilder *sb) { + if (sb == NULL) { return NULL; } + return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max); +} FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx) { return fstStreamBuilderCreate(fst, ctx); } diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index 0cfb0fedc3..f582536817 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -59,9 +59,22 @@ class FstReadMemory { return ok; } // add later - bool Search(const std::string &key, std::vector &result) { + bool Search(AutomationCtx *ctx, std::vector &result) { + FstStreamBuilder *sb = fstSearch(_fst, ctx); + StreamWithState *st = streamBuilderIntoStream(sb); + StreamWithStateResult *rt = NULL; + + while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { + result.push_back((uint64_t)(rt->out.out)); + } return true; } + bool SearchWithTimeCostUs(AutomationCtx *ctx, std::vector &result) { + int64_t s = taosGetTimestampUs(); + bool ok = this->Search(ctx, result); + int64_t e = taosGetTimestampUs(); + return ok; + } ~FstReadMemory() { fstCountingWriterDestroy(_w); @@ -186,11 +199,43 @@ void checkFstPerf() { printf("success to init fst read"); } Performance_fstReadRecords(m); - delete m; } +void checkFstPrefixSearch() { + FstWriter *fw = new FstWriter; + int64_t s = taosGetTimestampUs(); + int count = 2; + std::string key("ab"); + + for (int i = 0; i < count; i++) { + key[1] = key[1] + i; + fw->Put(key, i); + } + int64_t e = taosGetTimestampUs(); + + std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl; + delete fw; + FstReadMemory *m = new FstReadMemory(1024 * 64); + if (m->init() == false) { + std::cout << "init readMemory failed" << std::endl; + delete m; + return; + } + + // prefix search + std::vector result; + AutomationCtx *ctx = automCtxCreate((void *)"ab", AUTOMATION_PREFIX); + m->Search(ctx, result); + assert(result.size() == count); + for (int i = 0; i < result.size(); i++) { + assert(result[i] == i); // check result + } + + free(ctx); + delete m; +} void validateFst() { int val = 100; int count = 100; @@ -209,6 +254,8 @@ void validateFst() { FstReadMemory *m = new FstReadMemory(1024 * 64); if (m->init() == false) { std::cout << "init readMemory failed" << std::endl; + delete m; + return; } { @@ -230,10 +277,12 @@ void validateFst() { } } delete m; - } + + int main(int argc, char** argv) { checkFstPerf(); + //checkFstPrefixSearch(); return 1; } From 8f16679be418e947ed2f74f1a5cdcd611b205525 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 15 Dec 2021 23:52:23 +0800 Subject: [PATCH 4/9] fix search bug --- source/libs/index/src/index_fst.c | 2 +- source/libs/index/src/index_fst_automation.c | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 54058489c2..e40195e6e6 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1180,7 +1180,7 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) { bool fstBoundWithDataIsIncluded(FstBoundWithData *bound) { - return bound->type == Included ? true : false; + return bound->type == Excluded? false : true; } void fstBoundDestroy(FstBoundWithData *bound) { diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index d905147654..803c95d0d8 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -145,7 +145,8 @@ AutomationCtx* automCtxCreate(void *data,AutomationType atype) { StartWithStateValue *sv = NULL; if (atype == AUTOMATION_PREFIX) { - sv = startWithStateValueCreate(Running, FST_INT, 0); + int val = 0; + sv = startWithStateValueCreate(Running, FST_INT, &val); ctx->stdata = (void *)sv; } else if (atype == AUTMMATION_MATCH) { From 4932afa8e2bd51fefc59cac73e9c8ce18941ee58 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 16 Dec 2021 00:15:57 +0800 Subject: [PATCH 5/9] fix search bug --- source/libs/index/src/index_fst.c | 2 +- source/libs/index/src/index_fst_automation.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index e40195e6e6..9cb4ac6836 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1123,7 +1123,7 @@ CompiledAddr fstGetRootAddr(Fst *fst) { Output fstEmptyFinalOutput(Fst *fst, bool *null) { Output res = 0; - FstNode *node = fst->root; + FstNode *node = fstGetRoot(fst); if (FST_NODE_IS_FINAL(node)) { *null = false; res = FST_NODE_FINAL_OUTPUT(node); diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index 803c95d0d8..f70b90041b 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -68,7 +68,7 @@ StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv) { // prefix query, impl later static void* prefixStart(AutomationCtx *ctx) { - StartWithStateValue *data = (StartWithStateValue *)(ctx->data); + StartWithStateValue *data = (StartWithStateValue *)(ctx->stdata); return startWithStateValueDump(data); }; static bool prefixIsMatch(AutomationCtx *ctx, void *sv) { From cb1279a674deea53e2e3c35621d027b0bdb8907c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 16 Dec 2021 08:13:05 +0800 Subject: [PATCH 6/9] change get table meta api --- source/libs/parser/src/insertParser.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index fa59bc6ca7..219dd42f78 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -186,7 +186,7 @@ static int32_t buildMetaReq(SInsertParseContext* pCxt, SToken* pStname, SCatalog static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { SCatalogReq req; CHECK_CODE(buildMetaReq(pCxt, pTname, &req)); - CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, NULL, NULL, NULL, &pCxt->meta)); //TODO + CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, NULL, NULL, NULL, NULL, NULL)); //TODO pCxt->pTableMeta = (STableMeta*)taosArrayGetP(pCxt->meta.pTableMeta, 0); return TSDB_CODE_SUCCESS; } From 83bf82f077f960910179836c82c08bd1418019c5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 16 Dec 2021 09:54:03 +0800 Subject: [PATCH 7/9] support new apis --- include/libs/catalog/catalog.h | 8 +- source/common/inc/commonInt.h | 1 + source/libs/catalog/src/catalog.c | 182 +++++++++++++++++++++------ source/libs/parser/src/astValidate.c | 2 +- 4 files changed, 153 insertions(+), 40 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 38965c6ba9..6bbc4f9109 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -32,7 +32,7 @@ extern "C" { struct SCatalog; typedef struct SCatalogReq { - char clusterId[TSDB_CLUSTER_ID_LEN]; //???? + char dbName[TSDB_DB_NAME_LEN]; SArray *pTableName; // table full name SArray *pUdf; // udf name bool qNodeRequired; // valid qnode @@ -82,7 +82,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName); -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); /** @@ -91,7 +91,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const * @pVgroupList - array of SVgroupInfo * @return */ -int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList); +int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); /** @@ -103,7 +103,7 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe * @param pMetaData * @return */ -int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); +int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); diff --git a/source/common/inc/commonInt.h b/source/common/inc/commonInt.h index e7d2dba95c..5b71f83faf 100644 --- a/source/common/inc/commonInt.h +++ b/source/common/inc/commonInt.h @@ -20,6 +20,7 @@ extern "C" { #endif +extern bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 9921a2696b..a670d9d639 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -16,6 +16,7 @@ #include "catalogInt.h" #include "trpc.h" #include "query.h" +#include "tname.h" SCatalogMgmt ctgMgmt = {0}; @@ -47,7 +48,7 @@ int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSe return TSDB_CODE_SUCCESS; } -int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) { +int32_t ctgGetVgroupFromCache(struct SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) { if (NULL == pCatalog->vgroupCache.cache || pCatalog->vgroupCache.vgroupVersion < 0) { *exist = 0; return TSDB_CODE_SUCCESS; @@ -62,7 +63,7 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SHashObj** pVgroupList, int32_ return TSDB_CODE_SUCCESS; } -int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { +int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { if (NULL == pCatalog->dbCache.cache) { *exist = 0; return TSDB_CODE_SUCCESS; @@ -130,13 +131,13 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp } -int32_t ctgGetTableMetaFromCache(SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) { +int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) { if (NULL == pCatalog->tableCache.cache) { *exist = 0; return TSDB_CODE_SUCCESS; } - char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1]; + char tbFullName[TSDB_TABLE_FNAME_LEN]; snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName); @@ -200,7 +201,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE return TSDB_CODE_CTG_INVALID_INPUT; } - char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1]; + char tbFullName[TSDB_TABLE_FNAME_LEN]; snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); @@ -251,7 +252,54 @@ int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) { return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { +int32_t ctgGetVgroupFromVgId(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, int32_t vgId, SVgroupInfo *pVgroup) { + SHashObj *vgroupHash = NULL; + + CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash)); + if (NULL == vgroupHash) { + ctgError("get empty vgroup cache"); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) { + ctgError("vgId[%d] not found in vgroup list", vgId); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgGetVgroupFromVgIdBatch(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SArray* vgIds, SArray* vgroupList) { + SHashObj *vgroupHash = NULL; + SVgroupInfo pVgroup = {0}; + int32_t vgIdNum = taosArrayGetSize(vgIds); + + CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash)); + if (NULL == vgroupHash) { + ctgError("get empty vgroup cache"); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + for (int32_t i = 0; i < vgIdNum; ++i) { + int32_t *vgId = taosArrayGet(vgIds, i); + + if (NULL == taosHashGetClone(vgroupHash, vgId, sizeof(*vgId), &pVgroup)) { + ctgError("vgId[%d] not found in vgroup list", vgId); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + if (NULL == taosArrayPush(vgroupList, &pVgroup)) { + ctgError("push vgroup to array failed, idx:%d", i); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + } + + return TSDB_CODE_SUCCESS; +} + + + +int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { SDBVgroupInfo *dbInfo = NULL; int32_t code = 0; @@ -269,7 +317,7 @@ int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgm int32_t vgNum = taosArrayGetSize(dbInfo->vgId); if (vgNum <= 0) { - ctgError("db[%s] vgroup cache invalid, vgroup number:%p", vgNum); + ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum); CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED); } @@ -277,7 +325,7 @@ int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgm CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashType, &fp)); - char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1]; + char tbFullName[TSDB_TABLE_FNAME_LEN]; snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); @@ -285,18 +333,7 @@ int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgm uint32_t hashUnit = dbInfo->hashRange / vgNum; uint32_t vgId = hashValue / hashUnit; - SHashObj *vgroupHash = NULL; - - CTG_ERR_JRET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash)); - if (NULL == vgroupHash) { - ctgError("get empty vgroup cache"); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) { - ctgError("vgId[%d] not found in vgroup list", vgId); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } + CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, vgId, pVgroup)); _return: if (dbInfo && dbInfo->vgId) { @@ -335,7 +372,7 @@ STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) { return pTableMeta; } -int32_t ctgGetTableMetaImpl(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) { +int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) { if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -363,7 +400,7 @@ int32_t ctgGetTableMetaImpl(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtE } -int32_t ctgUpdateTableMetaCache(SCatalog *pCatalog, STableMetaOutput *output) { +int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { if (output->metaNum != 1 && output->metaNum != 2) { ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); return TSDB_CODE_CTG_INTERNAL_ERROR; @@ -433,7 +470,6 @@ error_exit: return TSDB_CODE_CTG_INTERNAL_ERROR; } - int32_t catalogInit(SCatalogCfg *cfg) { ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == ctgMgmt.pCluster) { @@ -451,7 +487,7 @@ int32_t catalogInit(SCatalogCfg *cfg) { return TSDB_CODE_SUCCESS; } -int32_t catalogGetHandle(const char *clusterId, SCatalog** catalogHandle) { +int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) { if (NULL == clusterId || NULL == catalogHandle) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -545,7 +581,7 @@ error_exit: return TSDB_CODE_CTG_INTERNAL_ERROR; } -int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) { +int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) { if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -677,7 +713,7 @@ _return: return code; } -int32_t catalogGetTableMeta(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) { +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) { return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, false, pTableMeta); } @@ -701,21 +737,97 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe return TSDB_CODE_SUCCESS; } -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta) { +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) { return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta); } -int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList) { - -} - - -int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { - if (NULL == pCatalog || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { +int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) { return TSDB_CODE_CTG_INVALID_INPUT; } - return 0; + STableMeta *tbMeta = NULL; + int32_t code = 0; + SVgroupInfo vgroupInfo = {0}; + SDBVgroupInfo *dbVgroup = NULL; + + CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta)); + + if (tbMeta->tableType == TSDB_SUPER_TABLE) { + CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup)); + + CTG_ERR_JRET(ctgGetVgroupFromVgIdBatch(pCatalog, pRpc, pMgmtEps, dbVgroup->vgId, pVgroupList)); + } else { + CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, tbMeta->vgId, &vgroupInfo)); + + if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) { + ctgError("push vgroupInfo to array failed"); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + } + +_return: + tfree(tbMeta); + if (dbVgroup && dbVgroup->vgId) { + taosArrayDestroy(dbVgroup->vgId); + dbVgroup->vgId = NULL; + } + + tfree(dbVgroup); + + return code; +} + + +int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + int32_t code = 0; + + if (pReq->pTableName) { + char dbName[TSDB_FULL_DB_NAME_LEN]; + int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName); + if (tbNum > 0) { + pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES); + if (NULL == pRsp->pTableMeta) { + ctgError("taosArrayInit num[%d] failed", tbNum); + return TSDB_CODE_CTG_MEM_ERROR; + } + } + + for (int32_t i = 0; i < tbNum; ++i) { + SName *name = taosArrayGet(pReq->pTableName, i); + STableMeta *pTableMeta = NULL; + + snprintf(dbName, sizeof(dbName), "%s.%s", name->acctId, name->dbname); + + CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, dbName, name->tname, &pTableMeta)); + + if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) { + ctgError("taosArrayPush failed, idx:%d", i); + tfree(pTableMeta); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + } + } + + return TSDB_CODE_SUCCESS; + +_return: + + if (pRsp->pTableMeta) { + int32_t aSize = taosArrayGetSize(pRsp->pTableMeta); + for (int32_t i = 0; i < aSize; ++i) { + STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i); + tfree(pMeta); + } + + taosArrayDestroy(pRsp->pTableMeta); + } + + return code; } void catalogDestroy(void) { diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index d2813886b3..9f1363a9b4 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -4088,7 +4088,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer } // load the meta data from catalog - code = catalogGetAllMeta(pCatalog, NULL, &req, &data); + code = catalogGetAllMeta(pCatalog, NULL, NULL, &req, &data); if (code != TSDB_CODE_SUCCESS) { return code; } From c84b6d09e497442c3a0e940fbb410915b7791f0b Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 15 Dec 2021 22:18:51 -0500 Subject: [PATCH 8/9] TD-12035 Scan subquery physical plan code. --- include/libs/parser/parser.h | 12 +- include/libs/planner/planner.h | 104 ++++++++++++----- include/libs/planner/plannerOp.h | 48 ++++++++ include/util/tarray.h | 2 +- source/libs/parser/src/insertParser.c | 52 +++++---- source/libs/parser/test/CMakeLists.txt | 11 +- source/libs/parser/test/insertTest.cpp | 21 ++++ source/libs/planner/inc/plannerInt.h | 51 +-------- source/libs/planner/src/physicalPlan.c | 150 +++++++++++++++++-------- source/libs/planner/src/planner.c | 3 +- source/libs/scheduler/CMakeLists.txt | 2 +- source/util/src/tarray.c | 2 +- 12 files changed, 295 insertions(+), 163 deletions(-) create mode 100644 include/libs/planner/plannerOp.h diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 2f152c3e2b..d65b5ab570 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -132,13 +132,15 @@ struct SInsertStmtInfo; bool qIsInsertSql(const char* pStr, size_t length); typedef struct SParseContext { + const char* pAcctId; + const char* pDbname; + void *pRpc; + const char* pClusterId; + const SEpSet* pEpSet; + int64_t id; // query id, generated by uuid generator + int8_t schemaAttached; // denote if submit block is built with table schema or not const char* pSql; // sql string size_t sqlLen; // length of the sql string - int64_t id; // operator id, generated by uuid generator - const char* pDbname; - const SEpSet* pEpSet; - int8_t schemaAttached; // denote if submit block is built with table schema or not - char* pMsg; // extended error message if exists to help avoid the problem in sql statement. int32_t msgLen; // max length of the msg } SParseContext; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index be00ed65f0..844757eeb5 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -20,51 +20,92 @@ extern "C" { #endif +#include "taosmsg.h" + #define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_PARTIAL 2 #define QUERY_TYPE_SCAN 3 enum OPERATOR_TYPE_E { - OP_TableScan = 1, - OP_DataBlocksOptScan = 2, - OP_TableSeqScan = 3, - OP_TagScan = 4, - OP_TableBlockInfoScan= 5, - OP_Aggregate = 6, - OP_Project = 7, - OP_Groupby = 8, - OP_Limit = 9, - OP_SLimit = 10, - OP_TimeWindow = 11, - OP_SessionWindow = 12, - OP_StateWindow = 22, - OP_Fill = 13, - OP_MultiTableAggregate = 14, - OP_MultiTableTimeInterval = 15, -// OP_DummyInput = 16, //TODO remove it after fully refactor. -// OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream. -// OP_GlobalAggregate = 18, // global merge for the multi-way data sources. - OP_Filter = 19, - OP_Distinct = 20, - OP_Join = 21, - OP_AllTimeWindow = 23, - OP_AllMultiTableTimeInterval = 24, - OP_Order = 25, - OP_Exchange = 26, + OP_Unknown, +#define INCLUDE_AS_ENUM +#include "plannerOp.h" +#undef INCLUDE_AS_ENUM + OP_TotalNum }; struct SEpSet; -struct SPhyNode; struct SQueryStmtInfo; +typedef SSchema SSlotSchema; + +typedef struct SDataBlockSchema { + SSlotSchema *pSchema; + int32_t numOfCols; // number of columns +} SDataBlockSchema; + +typedef struct SQueryNodeBasicInfo { + int32_t type; // operator type + const char *name; // operator name +} SQueryNodeBasicInfo; + +typedef struct SPhyNode { + SQueryNodeBasicInfo info; + SArray *pTargets; // target list to be computed or scanned at this node + SArray *pConditions; // implicitly-ANDed qual conditions + SDataBlockSchema targetSchema; + // children plan to generated result for current node to process + // in case of join, multiple plan nodes exist. + SArray *pChildren; + struct SPhyNode *pParent; +} SPhyNode; + +typedef struct SScanPhyNode { + SPhyNode node; + uint64_t uid; // unique id of the table + int8_t tableType; +} SScanPhyNode; + +typedef SScanPhyNode SSystemTableScanPhyNode; +typedef SScanPhyNode STagScanPhyNode; + +typedef struct STableScanPhyNode { + SScanPhyNode scan; + uint8_t scanFlag; // denotes reversed scan of data or not + STimeWindow window; + SArray *pTagsConditions; // implicitly-ANDed tag qual conditions +} STableScanPhyNode; + +typedef STableScanPhyNode STableSeqScanPhyNode; + +typedef struct SProjectPhyNode { + SPhyNode node; +} SProjectPhyNode; + +typedef struct SExchangePhyNode { + SPhyNode node; + uint64_t templateId; + SArray *pSourceEpSet; // SEpSet +} SExchangePhyNode; + +typedef struct SSubplanId { + uint64_t queryId; + uint64_t templateId; + uint64_t subplanId; +} SSubplanId; + typedef struct SSubplan { - int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN - SArray *pDatasource; // the datasource subplan,from which to fetch the result - struct SPhyNode *pNode; // physical plan of current subplan + SSubplanId id; // unique id of the subplan + int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN + int32_t level; // the execution level of current subplan, starting from 0. + SEpSet execEpSet; // for the scan sub plan, the optional execution node + SArray *pChildern; // the datasource subplan,from which to fetch the result + SArray *pParents; // the data destination subplan, get data from current subplan + SPhyNode *pNode; // physical plan of current subplan } SSubplan; typedef struct SQueryDag { - SArray **pSubplans; + SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryDag; /** @@ -74,6 +115,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); + /** * Convert to subplan to string for the scheduler to send to the executor */ diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h new file mode 100644 index 0000000000..27c7c534a2 --- /dev/null +++ b/include/libs/planner/plannerOp.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#if defined(INCLUDE_AS_ENUM) // enum define mode + #undef OP_ENUM_MACRO + #define OP_ENUM_MACRO(op) OP_##op, +#elif defined(INCLUDE_AS_NAME) // comment define mode + #undef OP_ENUM_MACRO + #define OP_ENUM_MACRO(op) #op, +#else + #error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME +#endif + +OP_ENUM_MACRO(TableScan) +OP_ENUM_MACRO(DataBlocksOptScan) +OP_ENUM_MACRO(TableSeqScan) +OP_ENUM_MACRO(TagScan) +OP_ENUM_MACRO(TableBlockInfoScan) +OP_ENUM_MACRO(Aggregate) +OP_ENUM_MACRO(Project) +OP_ENUM_MACRO(Groupby) +OP_ENUM_MACRO(Limit) +OP_ENUM_MACRO(SLimit) +OP_ENUM_MACRO(TimeWindow) +OP_ENUM_MACRO(SessionWindow) +OP_ENUM_MACRO(StateWindow) +OP_ENUM_MACRO(Fill) +OP_ENUM_MACRO(MultiTableAggregate) +OP_ENUM_MACRO(MultiTableTimeInterval) +OP_ENUM_MACRO(Filter) +OP_ENUM_MACRO(Distinct) +OP_ENUM_MACRO(Join) +OP_ENUM_MACRO(AllTimeWindow) +OP_ENUM_MACRO(AllMultiTableTimeInterval) +OP_ENUM_MACRO(Order) +OP_ENUM_MACRO(Exchange) diff --git a/include/util/tarray.h b/include/util/tarray.h index e0f14dcd25..f7c72add01 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -41,7 +41,7 @@ typedef struct SArray { * @param elemSize * @return */ -void* taosArrayInit(size_t size, size_t elemSize); +SArray* taosArrayInit(size_t size, size_t elemSize); /** * diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index fa59bc6ca7..97102c5b00 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -71,8 +71,7 @@ typedef struct SInsertParseContext { const char* pSql; SMsgBuf msg; struct SCatalog* pCatalog; - SMetaData meta; // need release - const STableMeta* pTableMeta; + STableMeta tableMeta; SHashObj* pTableBlockHashObj; // data block for each table. need release int32_t totalNum; SInsertStmtInfo* pOutput; @@ -165,29 +164,29 @@ static int32_t skipInsertInto(SInsertParseContext* pCxt) { return TSDB_CODE_SUCCESS; } -static int32_t buildTableName(SInsertParseContext* pCxt, SToken* pStname, SArray* tableNameList) { +static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) { if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) { return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z); } - SName name = {0}; - strcpy(name.dbname, pCxt->pComCxt->pDbname); - strncpy(name.tname, pStname->z, pStname->n); - taosArrayPush(tableNameList, &name); - + char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false); + if (NULL != p) { // db.table + strcpy(fullDbName, pCxt->pComCxt->pAcctId); + fullDbName[strlen(pCxt->pComCxt->pAcctId)] = TS_PATH_DELIMITER[0]; + strncpy(fullDbName, pStname->z, p - pStname->z); + strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1); + } else { + snprintf(fullDbName, TSDB_FULL_DB_NAME_LEN, "%s.%s", pCxt->pComCxt->pAcctId, pCxt->pComCxt->pDbname); + strncpy(tableName, pStname->z, pStname->n); + } return TSDB_CODE_SUCCESS; } -static int32_t buildMetaReq(SInsertParseContext* pCxt, SToken* pStname, SCatalogReq* pMetaReq) { - pMetaReq->pTableName = taosArrayInit(4, sizeof(SName)); - return buildTableName(pCxt, pStname, pMetaReq->pTableName); -} - static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { - SCatalogReq req; - CHECK_CODE(buildMetaReq(pCxt, pTname, &req)); - CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, NULL, NULL, NULL, &pCxt->meta)); //TODO - pCxt->pTableMeta = (STableMeta*)taosArrayGetP(pCxt->meta.pTableMeta, 0); + char fullDbName[TSDB_FULL_DB_NAME_LEN] = {0}; + char tableName[TSDB_TABLE_NAME_LEN] = {0}; + CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName)); + CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, &pCxt->tableMeta)); return TSDB_CODE_SUCCESS; } @@ -646,13 +645,13 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) NEXT_TOKEN(pCxt->pSql, sToken); CHECK_CODE(getTableMeta(pCxt, &sToken)); - if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { + if (TSDB_SUPER_TABLE != pCxt->tableMeta.tableType) { return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); } - SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta); + SSchema* pTagsSchema = getTableTagSchema(&pCxt->tableMeta); SParsedDataColInfo spd = {0}; - setBoundColumnInfo(&spd, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); + setBoundColumnInfo(&spd, pTagsSchema, getNumOfTags(&pCxt->tableMeta)); // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...) NEXT_TOKEN(pCxt->pSql, sToken); @@ -669,7 +668,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) if (TK_LP != sToken.type) { return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z); } - CHECK_CODE(parseTagsClause(pCxt, &spd, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision)); + CHECK_CODE(parseTagsClause(pCxt, &spd, pTagsSchema, getTableInfo(&pCxt->tableMeta).precision)); return TSDB_CODE_SUCCESS; } @@ -811,12 +810,12 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { } STableDataBlocks *dataBuf = NULL; - CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL)); + CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->tableMeta.uid, TSDB_DEFAULT_PAYLOAD_SIZE, + sizeof(SSubmitBlk), getTableInfo(&pCxt->tableMeta).rowSize, &pCxt->tableMeta, &dataBuf, NULL)); if (TK_LP == sToken.type) { // pSql -> field1_name, ...) - CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo)); + CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(&pCxt->tableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo)); NEXT_TOKEN(pCxt->pSql, sToken); } @@ -862,18 +861,17 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { .pSql = pContext->pSql, .msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, .pCatalog = NULL, - .pTableMeta = NULL, + .tableMeta = {0}, .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), .totalNum = 0, .pOutput = *pInfo }; - CHECK_CODE(catalogGetHandle(NULL, &context.pCatalog)); //TODO - if (NULL == context.pTableBlockHashObj) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + CHECK_CODE(catalogGetHandle(pContext->pClusterId, &context.pCatalog)); CHECK_CODE(skipInsertInto(&context)); CHECK_CODE(parseInsertBody(&context)); diff --git a/source/libs/parser/test/CMakeLists.txt b/source/libs/parser/test/CMakeLists.txt index 4b9e586be3..03b76152da 100644 --- a/source/libs/parser/test/CMakeLists.txt +++ b/source/libs/parser/test/CMakeLists.txt @@ -6,13 +6,16 @@ SET(CMAKE_CXX_STANDARD 11) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(parserTest ${SOURCE_LIST}) -TARGET_LINK_LIBRARIES( - parserTest - PUBLIC os util common parser catalog transport gtest function planner query -) TARGET_INCLUDE_DIRECTORIES( parserTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/parser/" PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/parser/inc" ) + +TARGET_LINK_LIBRARIES( + parserTest + PUBLIC os util common parser catalog transport gtest function planner query +) + +TARGET_LINK_OPTIONS(parserTest PRIVATE -Wl,-wrap,malloc) diff --git a/source/libs/parser/test/insertTest.cpp b/source/libs/parser/test/insertTest.cpp index 9cf48da4eb..5877adf41c 100644 --- a/source/libs/parser/test/insertTest.cpp +++ b/source/libs/parser/test/insertTest.cpp @@ -27,6 +27,27 @@ namespace { } } +extern "C" { + +#include + +void *__real_malloc(size_t); + +void *__wrap_malloc(size_t c) { + // printf("My MALLOC called: %d\n", c); + // void *array[32]; + // int size = backtrace(array, 32); + // char **symbols = backtrace_symbols(array, size); + // for (int i = 0; i < size; ++i) { + // cout << symbols[i] << endl; + // } + // free(symbols); + + return __real_malloc(c); +} + +} + // syntax: // INSERT INTO // tb_name diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 6dcd19782c..6c65e4810d 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -23,6 +23,7 @@ extern "C" { #include "common.h" #include "tarray.h" #include "planner.h" +#include "parser.h" #include "taosmsg.h" #define QNODE_TAGSCAN 1 @@ -40,11 +41,6 @@ extern "C" { #define QNODE_STATEWINDOW 13 #define QNODE_FILL 14 -typedef struct SQueryNodeBasicInfo { - int32_t type; // operator type - char *name; // operator name -} SQueryNodeBasicInfo; - typedef struct SQueryDistPlanNodeInfo { bool stableQuery; // super table query or not int32_t phase; // merge|partial @@ -54,8 +50,9 @@ typedef struct SQueryDistPlanNodeInfo { } SQueryDistPlanNodeInfo; typedef struct SQueryTableInfo { - char *tableName; - uint64_t uid; + char *tableName; // to be deleted + uint64_t uid; // to be deleted + STableMetaInfo* pMeta; STimeWindow window; } SQueryTableInfo; @@ -72,46 +69,6 @@ typedef struct SQueryPlanNode { struct SQueryPlanNode *pParent; } SQueryPlanNode; -typedef SSchema SSlotSchema; - -typedef struct SDataBlockSchema { - int32_t index; - SSlotSchema *pSchema; - int32_t numOfCols; // number of columns -} SDataBlockSchema; - -typedef struct SPhyNode { - SQueryNodeBasicInfo info; - SArray *pTargets; // target list to be computed or scanned at this node - SArray *pConditions; // implicitly-ANDed qual conditions - SDataBlockSchema targetSchema; - // children plan to generated result for current node to process - // in case of join, multiple plan nodes exist. - SArray *pChildren; - struct SPhyNode *pParent; -} SPhyNode; - -typedef struct SScanPhyNode { - SPhyNode node; - STimeWindow window; - uint64_t uid; // unique id of the table -} SScanPhyNode; - -typedef SScanPhyNode STagScanPhyNode; - -typedef SScanPhyNode SSystemTableScanPhyNode; - -typedef struct SMultiTableScanPhyNode { - SScanPhyNode scan; - SArray *pTagsConditions; // implicitly-ANDed tag qual conditions -} SMultiTableScanPhyNode; - -typedef SMultiTableScanPhyNode SMultiTableSeqScanPhyNode; - -typedef struct SProjectPhyNode { - SPhyNode node; -} SProjectPhyNode; - /** * Optimize the query execution plan, currently not implement yet. * @param pQueryNode diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index c4564a9a09..e7acb12bc0 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -14,64 +14,107 @@ */ #include "plannerInt.h" +#include "parser.h" -// typedef struct SQueryPlanNode { -// void *pExtInfo; // additional information -// SArray *pPrevNodes; // children -// struct SQueryPlanNode *nextNode; // parent -// } SQueryPlanNode; +static const char* gOpName[] = { + "Unknown", +#define INCLUDE_AS_NAME +#include "plannerOp.h" +#undef INCLUDE_AS_NAME +}; -// typedef struct SSubplan { -// int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN -// SArray *pDatasource; // the datasource subplan,from which to fetch the result -// struct SPhyNode *pNode; // physical plan of current subplan -// } SSubplan; +typedef struct SPlanContext { + struct SCatalog* pCatalog; + struct SQueryDag* pDag; + SSubplan* pCurrentSubplan; + SSubplanId nextId; +} SPlanContext; -// typedef struct SQueryDag { -// SArray **pSubplans; -// } SQueryDag; - -// typedef struct SScanPhyNode { -// SPhyNode node; -// STimeWindow window; -// uint64_t uid; // unique id of the table -// } SScanPhyNode; - -// typedef SScanPhyNode STagScanPhyNode; - -void fillDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) { - dataBlockSchema->index = 0; // todo +static void toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) { SWAP(dataBlockSchema->pSchema, pPlanNode->pSchema, SSchema*); dataBlockSchema->numOfCols = pPlanNode->numOfCols; } -void fillPhyNode(SQueryPlanNode* pPlanNode, int32_t type, const char* name, SPhyNode* node) { +static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) { + SPhyNode* node = (SPhyNode*)calloc(1, size); node->info.type = type; - node->info.name = name; + node->info.name = gOpName[type]; SWAP(node->pTargets, pPlanNode->pExpr, SArray*); - fillDataBlockSchema(pPlanNode, &(node->targetSchema)); + toDataBlockSchema(pPlanNode, &(node->targetSchema)); } -SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) { - STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode)); - fillPhyNode(pPlanNode, OP_TagScan, "TagScan", (SPhyNode*)node); - return (SPhyNode*)node; +static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) { + return initPhyNode(pPlanNode, OP_TagScan, sizeof(STagScanPhyNode)); } -SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) { - STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode)); - fillPhyNode(pPlanNode, OP_TableScan, "SingleTableScan", (SPhyNode*)node); - return (SPhyNode*)node; +static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { + SSubplan* subplan = calloc(1, sizeof(SSubplan)); + subplan->id = pCxt->nextId; + ++(pCxt->nextId.subplanId); + subplan->type = type; + subplan->level = 0; + if (NULL != pCxt->pCurrentSubplan) { + subplan->level = pCxt->pCurrentSubplan->level + 1; + if (NULL == pCxt->pCurrentSubplan->pChildern) { + pCxt->pCurrentSubplan->pChildern = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + } + taosArrayPush(pCxt->pCurrentSubplan->pChildern, subplan); + subplan->pParents = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + taosArrayPush(subplan->pParents, pCxt->pCurrentSubplan); + } + pCxt->pCurrentSubplan = subplan; + return subplan; } -SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) { +static uint8_t getScanFlag(SQueryPlanNode* pPlanNode) { + // todo + return MASTER_SCAN; +} + +static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { + STableScanPhyNode* node = (STableScanPhyNode*)initPhyNode(pPlanNode, OP_TableScan, sizeof(STableScanPhyNode)); + node->scan.uid = pTable->pMeta->pTableMeta->uid; + node->scan.tableType = pTable->pMeta->pTableMeta->tableType; + node->scanFlag = getScanFlag(pPlanNode); + node->window = pTable->window; + // todo tag cond +} + +static void vgroupToEpSet(const SVgroupMsg* vg, SEpSet* epSet) { + // todo +} + +static void splitSubplanBySTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { + SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList; + for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { + SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); + vgroupToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet); + subplan->pNode = createTableScanNode(pCxt, pPlanNode, pTable); + // todo reset pCxt->pCurrentSubplan + } +} + +static SPhyNode* createExchangeNode() { + +} + +static SPhyNode* createScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { + SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo; + if (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType) { + splitSubplanBySTable(pCxt, pPlanNode, pTable); + return createExchangeNode(pCxt, pTable); + } + return createTableScanNode(pCxt, pPlanNode, pTable); +} + +static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SPhyNode* node = NULL; switch (pPlanNode->info.type) { case QNODE_TAGSCAN: node = createTagScanNode(pPlanNode); break; case QNODE_TABLESCAN: - node = createScanNode(pPlanNode); + node = createScanNode(pCxt, pPlanNode); break; default: assert(false); @@ -80,7 +123,7 @@ SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) { node->pChildren = taosArrayInit(4, POINTER_BYTES); size_t size = taosArrayGetSize(pPlanNode->pChildren); for(int32_t i = 0; i < size; ++i) { - SPhyNode* child = createPhyNode(taosArrayGet(pPlanNode->pChildren, i)); + SPhyNode* child = createPhyNode(pCxt, taosArrayGet(pPlanNode->pChildren, i)); child->pParent = node; taosArrayPush(node->pChildren, &child); } @@ -88,13 +131,30 @@ SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) { return node; } -SSubplan* createSubplan(SQueryPlanNode* pSubquery) { - SSubplan* subplan = calloc(1, sizeof(SSubplan)); - subplan->pNode = createPhyNode(pSubquery); - // todo - return subplan; +static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { + SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); + subplan->pNode = createPhyNode(pCxt, pRoot); + SArray* l0 = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + taosArrayPush(l0, &subplan); + taosArrayPush(pCxt->pDag->pSubplans, &l0); + // todo deal subquery } -int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { - return 0; +int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) { + SPlanContext context = { + .pCatalog = pCatalog, + .pDag = calloc(1, sizeof(SQueryDag)), + .pCurrentSubplan = NULL + }; + if (NULL == context.pDag) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + context.pDag->pSubplans = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + createSubplanByLevel(&context, pQueryNode); + *pDag = context.pDag; + return TSDB_CODE_SUCCESS; +} + +int32_t subPlanToString(struct SSubplan *pPhyNode, char** str) { + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index b18dd29257..19aac36e78 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -95,6 +95,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla } switch(type) { + case QNODE_TAGSCAN: case QNODE_TABLESCAN: { SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo)); memcpy(info, pExtInfo, sizeof(SQueryTableInfo)); @@ -162,7 +163,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe SArray* pExprs, SArray* tableCols) { if (pQueryInfo->info.onlyTagQuery) { int32_t num = (int32_t) taosArrayGetSize(pExprs); - SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, NULL); + SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info); if (pQueryInfo->info.distinct) { pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL); diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index 770a6b02c2..fd00085381 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,5 +9,5 @@ target_include_directories( target_link_libraries( scheduler - PRIVATE os util planner + PRIVATE os util planner common ) \ No newline at end of file diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 581a797343..cc8d6646b6 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -17,7 +17,7 @@ #include "tarray.h" #include "talgo.h" -void* taosArrayInit(size_t size, size_t elemSize) { +SArray* taosArrayInit(size_t size, size_t elemSize) { assert(elemSize > 0); if (size < TARRAY_MIN_SIZE) { From 3b8725461a436bc1be525f481911a8dcfc2cae3d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 16 Dec 2021 14:01:10 +0800 Subject: [PATCH 9/9] change use db msg and remove vgroup cache --- include/common/taosmsg.h | 23 +- include/libs/catalog/catalog.h | 16 +- include/libs/query/query.h | 13 +- source/libs/catalog/CMakeLists.txt | 2 + source/libs/catalog/src/catalog.c | 324 +++++----------------- source/libs/catalog/test/CMakeLists.txt | 18 ++ source/libs/catalog/test/catalogTests.cpp | 152 ++++++++++ source/libs/query/src/querymsg.c | 174 +++--------- 8 files changed, 296 insertions(+), 426 deletions(-) create mode 100644 source/libs/catalog/test/CMakeLists.txt diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 1fe930ba7a..8abfe0ffed 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -221,8 +221,7 @@ typedef struct SBuildTableMetaInput { typedef struct SBuildUseDBInput { char db[TSDB_TABLE_FNAME_LEN]; - int32_t vgroupVersion; - int32_t dbGroupVersion; + int32_t vgVersion; } SBuildUseDBInput; @@ -627,8 +626,7 @@ typedef struct { typedef struct { char db[TSDB_TABLE_FNAME_LEN]; int8_t ignoreNotExists; - int32_t vgroupVersion; - int32_t dbGroupVersion; + int32_t vgVersion; int32_t reserve[8]; } SUseDbMsg; @@ -808,6 +806,9 @@ typedef struct SSTableVgroupMsg { typedef struct SVgroupInfo { int32_t vgId; + int32_t hashBegin; + int32_t hashEnd; + int8_t inUse; int8_t numOfEps; SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; } SVgroupInfo; @@ -863,16 +864,12 @@ typedef struct { } STagData; typedef struct { - int32_t vgroupNum; - int32_t vgroupVersion; - char db[TSDB_TABLE_FNAME_LEN]; - int32_t dbVgroupVersion; - int32_t dbVgroupNum; - int32_t dbHashRange; - int32_t dbHashType; + char db[TSDB_FULL_DB_NAME_LEN]; + int32_t vgVersion; + int32_t vgNum; + int8_t hashMethod; SVgroupInfo vgroupInfo[]; -//int32_t vgIdList[]; -} SUseDbRspMsg; +} SUseDbRsp; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 6bbc4f9109..1f2452291b 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -61,22 +61,8 @@ int32_t catalogInit(SCatalogCfg *cfg); */ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); - - -int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); - -/** - * get cluster vgroup list. - * @pVgroupList - hash of vgroup list, key:vgId, value:SVgroupInfo - * @return - */ -int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash); -int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup); - - - int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo); +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo); int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); diff --git a/include/libs/query/query.h b/include/libs/query/query.h index bfe2db6a61..8720fd085c 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "tarray.h" +#include "thash.h" typedef SVgroupListRspMsg SVgroupListInfo; @@ -63,16 +64,14 @@ typedef struct STableMeta { typedef struct SDBVgroupInfo { - int32_t vgroupVersion; - SArray *vgId; - int32_t hashRange; - int32_t hashType; + int32_t vgVersion; + int8_t hashMethod; + SHashObj *vgInfo; //key:vgId, value:SVgroupInfo } SDBVgroupInfo; typedef struct SUseDbOutput { - SVgroupListInfo *vgroupList; - char db[TSDB_TABLE_FNAME_LEN]; - SDBVgroupInfo *dbVgroup; + char db[TSDB_FULL_DB_NAME_LEN]; + SDBVgroupInfo dbVgroup; } SUseDbOutput; typedef struct STableMetaOutput { diff --git a/source/libs/catalog/CMakeLists.txt b/source/libs/catalog/CMakeLists.txt index 25c80d502a..e6311152d6 100644 --- a/source/libs/catalog/CMakeLists.txt +++ b/source/libs/catalog/CMakeLists.txt @@ -10,3 +10,5 @@ target_link_libraries( catalog PRIVATE os util common transport query ) + +ADD_SUBDIRECTORY(test) \ No newline at end of file diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index a670d9d639..9fdac36060 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -20,50 +20,7 @@ SCatalogMgmt ctgMgmt = {0}; -int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SVgroupListInfo** pVgroup) { - char *msg = NULL; - SEpSet *pVnodeEpSet = NULL; - int32_t msgLen = 0; - - int32_t code = queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST](NULL, &msg, 0, &msgLen); - if (code) { - return code; - } - - SRpcMsg rpcMsg = { - .msgType = TSDB_MSG_TYPE_VGROUP_LIST, - .pCont = msg, - .contLen = msgLen, - }; - - SRpcMsg rpcRsp = {0}; - - rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); - - code = queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST](pVgroup, rpcRsp.pCont, rpcRsp.contLen); - if (code) { - return code; - } - - return TSDB_CODE_SUCCESS; -} - -int32_t ctgGetVgroupFromCache(struct SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) { - if (NULL == pCatalog->vgroupCache.cache || pCatalog->vgroupCache.vgroupVersion < 0) { - *exist = 0; - return TSDB_CODE_SUCCESS; - } - - if (pVgroupList) { - *pVgroupList = pCatalog->vgroupCache.cache; - } - - *exist = 1; - - return TSDB_CODE_SUCCESS; -} - -int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { +int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo *dbInfo, int32_t *exist) { if (NULL == pCatalog->dbCache.cache) { *exist = 0; return TSDB_CODE_SUCCESS; @@ -71,28 +28,13 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); - if (NULL == info || info->vgroupVersion < pCatalog->vgroupCache.vgroupVersion) { + if (NULL == info) { *exist = 0; return TSDB_CODE_SUCCESS; } if (dbInfo) { - *dbInfo = calloc(1, sizeof(**dbInfo)); - if (NULL == *dbInfo) { - ctgError("calloc size[%d] failed", (int32_t)sizeof(**dbInfo)); - return TSDB_CODE_CTG_MEM_ERROR; - } - - (*dbInfo)->vgId = taosArrayDup(info->vgId); - if (NULL == (*dbInfo)->vgId) { - ctgError("taos array duplicate failed"); - tfree(*dbInfo); - return TSDB_CODE_CTG_MEM_ERROR; - } - - (*dbInfo)->vgroupVersion = info->vgroupVersion; - (*dbInfo)->hashRange = info->hashRange; - (*dbInfo)->hashType = info->hashType; + *dbInfo = *info; } *exist = 1; @@ -242,8 +184,8 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE } -int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) { - switch (hashType) { +int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { + switch (hashMethod) { default: *fp = MurmurHash3_32; break; @@ -252,96 +194,79 @@ int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) { return TSDB_CODE_SUCCESS; } -int32_t ctgGetVgroupFromVgId(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, int32_t vgId, SVgroupInfo *pVgroup) { +int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray* vgroupList) { SHashObj *vgroupHash = NULL; - - CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash)); - if (NULL == vgroupHash) { - ctgError("get empty vgroup cache"); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } + SVgroupInfo *vgInfo = NULL; - if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) { - ctgError("vgId[%d] not found in vgroup list", vgId); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } + void *pIter = taosHashIterate(dbInfo->vgInfo, NULL); + while (pIter) { + vgInfo = pIter; - return TSDB_CODE_SUCCESS; -} - -int32_t ctgGetVgroupFromVgIdBatch(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SArray* vgIds, SArray* vgroupList) { - SHashObj *vgroupHash = NULL; - SVgroupInfo pVgroup = {0}; - int32_t vgIdNum = taosArrayGetSize(vgIds); - - CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash)); - if (NULL == vgroupHash) { - ctgError("get empty vgroup cache"); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } - - for (int32_t i = 0; i < vgIdNum; ++i) { - int32_t *vgId = taosArrayGet(vgIds, i); + if (NULL == taosArrayPush(vgroupList, vgInfo)) { + ctgError("taosArrayPush failed"); + break; + } - if (NULL == taosHashGetClone(vgroupHash, vgId, sizeof(*vgId), &pVgroup)) { - ctgError("vgId[%d] not found in vgroup list", vgId); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } - - if (NULL == taosArrayPush(vgroupList, &pVgroup)) { - ctgError("push vgroup to array failed, idx:%d", i); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } + pIter = taosHashIterate(dbInfo->vgInfo, pIter); + vgInfo = NULL; } return TSDB_CODE_SUCCESS; } - - -int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { - SDBVgroupInfo *dbInfo = NULL; - int32_t code = 0; - - CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo)); - - if (NULL == dbInfo) { - ctgWarn("db[%s] vgroup info not found", pDBName); +int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { + int32_t vgNum = taosHashGetSize(dbInfo->vgInfo); + if (vgNum <= 0) { + ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum); return TSDB_CODE_TSC_DB_NOT_SELECTED; } - if (dbInfo->vgroupVersion < 0 || NULL == dbInfo->vgId) { - ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgId:%p", pDBName, dbInfo->vgroupVersion, dbInfo->vgId); - CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED); - } - - int32_t vgNum = taosArrayGetSize(dbInfo->vgId); - if (vgNum <= 0) { - ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum); - CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED); - } - tableNameHashFp fp = NULL; + SVgroupInfo *vgInfo = NULL; - CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashType, &fp)); + CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp)); char tbFullName[TSDB_TABLE_FNAME_LEN]; snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName)); - uint32_t hashUnit = dbInfo->hashRange / vgNum; - uint32_t vgId = hashValue / hashUnit; - CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, vgId, pVgroup)); - -_return: - if (dbInfo && dbInfo->vgId) { - taosArrayDestroy(dbInfo->vgId); - dbInfo->vgId = NULL; + void *pIter = taosHashIterate(dbInfo->vgInfo, NULL); + while (pIter) { + vgInfo = pIter; + if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) { + break; + } + + pIter = taosHashIterate(dbInfo->vgInfo, pIter); + vgInfo = NULL; } - - tfree(dbInfo); + + if (NULL == vgInfo) { + ctgError("no hash range found for hashvalue[%u]", hashValue); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + *pVgroup = *vgInfo; + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { + SDBVgroupInfo dbInfo = {0}; + int32_t code = 0; + int32_t vgId = 0; + + CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo)); + + if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { + ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo); + return TSDB_CODE_TSC_DB_NOT_SELECTED; + } + + CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup)); return code; } @@ -524,95 +449,6 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) return TSDB_CODE_SUCCESS; } - -int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) { - if (NULL == pCatalog || NULL == version) { - return TSDB_CODE_CTG_INVALID_INPUT; - } - - *version = pCatalog->vgroupCache.vgroupVersion; - - return TSDB_CODE_SUCCESS; -} - - - -int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) { - if (NULL == pVgroup) { - ctgError("no valid vgroup list info to update"); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } - - if (pVgroup->vgroupVersion < 0) { - ctgError("vgroup version[%d] is invalid", pVgroup->vgroupVersion); - return TSDB_CODE_CTG_INVALID_INPUT; - } - - if (NULL == pCatalog->vgroupCache.cache) { - pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_CACHE_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (NULL == pCatalog->vgroupCache.cache) { - ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_CACHE_VGROUP_NUMBER); - return TSDB_CODE_CTG_MEM_ERROR; - } - } else { - taosHashClear(pCatalog->vgroupCache.cache); - } - - SVgroupInfo *vInfo = NULL; - for (int32_t i = 0; i < pVgroup->vgroupNum; ++i) { - if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &pVgroup->vgroupInfo[i], sizeof(pVgroup->vgroupInfo[i])) != 0) { - ctgError("push to vgroup hash cache failed"); - goto error_exit; - } - } - - pCatalog->vgroupCache.vgroupVersion = pVgroup->vgroupVersion; - - return TSDB_CODE_SUCCESS; - -error_exit: - if (pCatalog->vgroupCache.cache) { - taosHashCleanup(pCatalog->vgroupCache.cache); - pCatalog->vgroupCache.cache = NULL; - } - - pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; - - return TSDB_CODE_CTG_INTERNAL_ERROR; -} - -int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) { - if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) { - return TSDB_CODE_CTG_INVALID_INPUT; - } - - int32_t exist = 0; - - CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist)); - - if (exist) { - return TSDB_CODE_SUCCESS; - } - - SVgroupListInfo *pVgroup = NULL; - - CTG_ERR_RET(ctgGetVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &pVgroup)); - - CTG_ERR_RET(catalogUpdateVgroupCache(pCatalog, pVgroup)); - - if (pVgroupHash) { - CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist)); - } - - if (0 == exist) { - ctgError("catalog fetched but get from cache failed"); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } - - return TSDB_CODE_SUCCESS; -} - - int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { if (NULL == pCatalog || NULL == dbName || NULL == version) { return TSDB_CODE_CTG_INVALID_INPUT; @@ -629,7 +465,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } - *version = dbInfo->vgroupVersion; + *version = dbInfo->vgVersion; return TSDB_CODE_SUCCESS; } @@ -639,7 +475,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName return TSDB_CODE_CTG_INVALID_INPUT; } - if (dbInfo->vgroupVersion < 0) { + if (dbInfo->vgVersion < 0) { if (pCatalog->dbCache.cache) { taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)); } @@ -654,6 +490,12 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); return TSDB_CODE_CTG_MEM_ERROR; } + } else { + SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + if (oldInfo && oldInfo->vgInfo) { + taosHashCleanup(oldInfo->vgInfo); + oldInfo->vgInfo = NULL; + } } if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) { @@ -667,7 +509,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) { +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -688,28 +530,16 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* strncpy(input.db, dbName, sizeof(input.db)); input.db[sizeof(input.db) - 1] = 0; - input.vgroupVersion = pCatalog->vgroupCache.vgroupVersion; - input.dbGroupVersion = CTG_DEFAULT_INVALID_VERSION; + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); - if (DbOut.vgroupList) { - CTG_ERR_JRET(catalogUpdateVgroupCache(pCatalog, DbOut.vgroupList)); - } - - if (DbOut.dbVgroup) { - CTG_ERR_JRET(catalogUpdateDBVgroupCache(pCatalog, dbName, DbOut.dbVgroup)); - } + CTG_ERR_RET(catalogUpdateDBVgroupCache(pCatalog, dbName, &DbOut.dbVgroup)); if (dbInfo) { *dbInfo = DbOut.dbVgroup; - DbOut.dbVgroup = NULL; } -_return: - tfree(DbOut.dbVgroup); - tfree(DbOut.vgroupList); - return code; } @@ -749,16 +579,20 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe STableMeta *tbMeta = NULL; int32_t code = 0; SVgroupInfo vgroupInfo = {0}; - SDBVgroupInfo *dbVgroup = NULL; + SDBVgroupInfo dbVgroup = {0}; CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta)); - if (tbMeta->tableType == TSDB_SUPER_TABLE) { - CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup)); + CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup)); - CTG_ERR_JRET(ctgGetVgroupFromVgIdBatch(pCatalog, pRpc, pMgmtEps, dbVgroup->vgId, pVgroupList)); + if (tbMeta->tableType == TSDB_SUPER_TABLE) { + CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList)); } else { - CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, tbMeta->vgId, &vgroupInfo)); + int32_t vgId = tbMeta->vgId; + if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { + ctgError("vgId[%d] not found in vgroup list", vgId); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) { ctgError("push vgroupInfo to array failed"); @@ -768,12 +602,6 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe _return: tfree(tbMeta); - if (dbVgroup && dbVgroup->vgId) { - taosArrayDestroy(dbVgroup->vgId); - dbVgroup->vgId = NULL; - } - - tfree(dbVgroup); return code; } diff --git a/source/libs/catalog/test/CMakeLists.txt b/source/libs/catalog/test/CMakeLists.txt new file mode 100644 index 0000000000..527156f176 --- /dev/null +++ b/source/libs/catalog/test/CMakeLists.txt @@ -0,0 +1,18 @@ + +MESSAGE(STATUS "build catalog unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +ADD_EXECUTABLE(catalogTest ${SOURCE_LIST}) +TARGET_LINK_LIBRARIES( + catalogTest + PUBLIC os util common catalog transport gtest query +) + +TARGET_INCLUDE_DIRECTORIES( + catalogTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/catalog/" + PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/catalog/inc" +) diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index e69de29bb2..f495451091 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#pragma GCC diagnostic ignored "-Wwrite-strings" + +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#include "os.h" + +#include "taos.h" +#include "tdef.h" +#include "tvariant.h" +#include "catalog.h" + +namespace { + + +} + +TEST(testCase, normalCase) { + char *clusterId = "cluster1"; + struct SCatalog* pCtg = NULL; + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(clusterId, &pCtg); + ASSERT_EQ(code, 0); + + +} + +/* +TEST(testCase, normalCase) { + SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)"); + ASSERT_EQ(info1.valid, true); + + char msg[128] = {0}; + SMsgBuf buf; + buf.len = 128; + buf.buf = msg; + + SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0); + int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); + ASSERT_EQ(code, 0); + + SCatalogReq req = {0}; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + ASSERT_EQ(ret, 0); + ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); + + SQueryStmtInfo* pQueryInfo = createQueryInfo(); + setTableMetaInfo(pQueryInfo, &req); + + SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0); + ret = validateSqlNode(pSqlNode, pQueryInfo, &buf); + ASSERT_EQ(ret, 0); + + SArray* pExprList = pQueryInfo->exprList[0]; + + int32_t num = tsCompatibleModel? 2:1; + ASSERT_EQ(taosArrayGetSize(pExprList), num); + + SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1); + ASSERT_EQ(p1->base.pColumns->uid, 110); + ASSERT_EQ(p1->base.numOfParams, 1); + ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE); + ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)"); + ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP); + ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)"); + ASSERT_EQ(p1->base.interBytes, 16); + + ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE); + ASSERT_STREQ(p1->pExpr->_function.functionName, "top"); + + tExprNode* pParam = p1->pExpr->_function.pChild[0]; + + ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE); + ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3); + ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2); + + struct SQueryPlanNode* n = nullptr; + code = createQueryPlan(pQueryInfo, &n); + + char* str = NULL; + queryPlanToString(n, &str); + printf("%s\n", str); + + destroyQueryInfo(pQueryInfo); + qParserClearupMetaRequestInfo(&req); + destroySqlInfo(&info1); +} + +TEST(testCase, displayPlan) { + generateLogicplan("select count(*) from `t.1abc`"); + generateLogicplan("select count(*)+ 22 from `t.1abc`"); + generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30"); + generateLogicplan("select count(*) from `t.1abc` group by a"); + generateLogicplan("select count(A+B) from `t.1abc` group by a"); + generateLogicplan("select count(length(a)+b) from `t.1abc` group by a"); + generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)"); + generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc "); + generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`"); + generateLogicplan("select count(*), min(a) + 99 from `t.1abc`"); + generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`"); + generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20"); + generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)"); + generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)"); + + // order by + group by column + limit offset + generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1"); + + // fill + generateLogicplan("select min(a) from `t.1abc` where ts>now and tsdb, bInput->db, sizeof(bMsg->db)); bMsg->db[sizeof(bMsg->db) - 1] = 0; - bMsg->vgroupVersion = bInput->vgroupVersion; - bMsg->dbGroupVersion = bInput->dbGroupVersion; + bMsg->vgVersion = bInput->vgVersion; *msgLen = (int32_t)sizeof(*bMsg); @@ -90,58 +78,12 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms } - -int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { - if (NULL == output || NULL == msg || msgSize <= 0) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - - SVgroupListRspMsg *pRsp = (SVgroupListRspMsg *)msg; - - pRsp->vgroupNum = htonl(pRsp->vgroupNum); - pRsp->vgroupVersion = htonl(pRsp->vgroupVersion); - - if (pRsp->vgroupNum < 0) { - qError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum); - return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; - } - - if (pRsp->vgroupVersion < 0) { - qError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion); - return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; - } - - if (msgSize != (pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp))) { - qError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum); - return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; - } - - // keep SVgroupListInfo/SVgroupListRspMsg the same - *(SVgroupListInfo **)output = (SVgroupListInfo *)msg; - - if (pRsp->vgroupNum == 0) { - return TSDB_CODE_SUCCESS; - } - - for (int32_t i = 0; i < pRsp->vgroupNum; ++i) { - pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); - for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { - pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); - } - } - - return TSDB_CODE_SUCCESS; -} - - - - int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { if (NULL == output || NULL == msg || msgSize <= 0) { return TSDB_CODE_TSC_INVALID_INPUT; } - SUseDbRspMsg *pRsp = (SUseDbRspMsg *)msg; + SUseDbRsp *pRsp = (SUseDbRsp *)msg; SUseDbOutput *pOut = (SUseDbOutput *)output; int32_t code = 0; @@ -150,104 +92,52 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } - pRsp->vgroupVersion = htonl(pRsp->vgroupVersion); - pRsp->dbVgroupVersion = htonl(pRsp->dbVgroupVersion); + pRsp->vgVersion = htonl(pRsp->vgVersion); + pRsp->vgNum = htonl(pRsp->vgNum); - pRsp->vgroupNum = htonl(pRsp->vgroupNum); - pRsp->dbVgroupNum = htonl(pRsp->dbVgroupNum); - - if (pRsp->vgroupNum < 0) { - qError("invalid vgroup number[%d]", pRsp->vgroupNum); + if (pRsp->vgNum < 0) { + qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum); return TSDB_CODE_TSC_INVALID_VALUE; } - if (pRsp->dbVgroupNum < 0) { - qError("invalid db vgroup number[%d]", pRsp->dbVgroupNum); - return TSDB_CODE_TSC_INVALID_VALUE; - } - - int32_t expectSize = pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + pRsp->dbVgroupNum * sizeof(int32_t) + sizeof(*pRsp); + int32_t expectSize = pRsp->vgNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp); if (msgSize != expectSize) { - qError("vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d", msgSize, expectSize, pRsp->vgroupNum, pRsp->dbVgroupNum); + qError("use db rsp size mis-match, msgSize:%d, expected:%d, vgnumber:%d", msgSize, expectSize, pRsp->vgNum); return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } - if (pRsp->vgroupVersion < 0) { - qInfo("no new vgroup list info"); - if (pRsp->vgroupNum != 0) { - qError("invalid vgroup number[%d] for no new vgroup list case", pRsp->vgroupNum); - return TSDB_CODE_TSC_INVALID_VALUE; - } - } else { - int32_t s = sizeof(*pOut->vgroupList) + sizeof(pOut->vgroupList->vgroupInfo[0]) * pRsp->vgroupNum; - pOut->vgroupList = calloc(1, s); - if (NULL == pOut->vgroupList) { - qError("calloc size[%d] failed", s); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - pOut->vgroupList->vgroupNum = pRsp->vgroupNum; - pOut->vgroupList->vgroupVersion = pRsp->vgroupVersion; - - for (int32_t i = 0; i < pRsp->vgroupNum; ++i) { - pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); - for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { - pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); - } - - memcpy(&pOut->vgroupList->vgroupInfo[i], &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i])); - } + pOut->dbVgroup.vgVersion = pRsp->vgVersion; + pOut->dbVgroup.hashMethod = pRsp->hashMethod; + pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == pOut->dbVgroup.vgInfo) { + qError("hash init[%d] failed", pRsp->vgNum); + return TSDB_CODE_TSC_OUT_OF_MEMORY; } - int32_t *vgIdList = (int32_t *)((char *)pRsp->vgroupInfo + sizeof(pRsp->vgroupInfo[0]) * pRsp->vgroupNum); + for (int32_t i = 0; i < pRsp->vgNum; ++i) { + pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); + pRsp->vgroupInfo[i].hashBegin = htonl(pRsp->vgroupInfo[i].hashBegin); + pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd); + + for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { + pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); + } + + if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) { + qError("hash push failed"); + goto _return; + } + } memcpy(pOut->db, pRsp->db, sizeof(pOut->db)); - - if (pRsp->dbVgroupVersion < 0) { - qInfo("no new vgroup info for db[%s]", pRsp->db); - } else { - pOut->dbVgroup = calloc(1, sizeof(*pOut->dbVgroup)); - if (NULL == pOut->dbVgroup) { - qError("calloc size[%d] failed", (int32_t)sizeof(*pOut->dbVgroup)); - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _exit; - } - - pOut->dbVgroup->vgId = taosArrayInit(pRsp->dbVgroupNum, sizeof(int32_t)); - if (NULL == pOut->dbVgroup->vgId) { - qError("taosArrayInit size[%d] failed", pRsp->dbVgroupNum); - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _exit; - } - - pOut->dbVgroup->vgroupVersion = pRsp->dbVgroupVersion; - pOut->dbVgroup->hashRange = htonl(pRsp->dbHashRange); - pOut->dbVgroup->hashType = htonl(pRsp->dbHashType); - - if (pOut->dbVgroup->hashRange < 0) { - qError("invalid hashRange[%d] for db[%s]", pOut->dbVgroup->hashRange, pRsp->db); - code = TSDB_CODE_TSC_INVALID_INPUT; - goto _exit; - } - - for (int32_t i = 0; i < pRsp->dbVgroupNum; ++i) { - *(vgIdList + i) = htonl(*(vgIdList + i)); - - taosArrayPush(pOut->dbVgroup->vgId, vgIdList + i) ; - } - } return code; -_exit: - if (pOut->dbVgroup && pOut->dbVgroup->vgId) { - taosArrayDestroy(pOut->dbVgroup->vgId); - pOut->dbVgroup->vgId = NULL; +_return: + if (pOut) { + tfree(pOut->dbVgroup.vgInfo); } - tfree(pOut->dbVgroup); - tfree(pOut->vgroupList); - return code; } @@ -375,11 +265,9 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { void msgInit() { queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg; - queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg; queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg; queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = queryProcessTableMetaRsp; - queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp; queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp; /*