diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 1fe930ba7a..8abfe0ffed 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -221,8 +221,7 @@ typedef struct SBuildTableMetaInput { typedef struct SBuildUseDBInput { char db[TSDB_TABLE_FNAME_LEN]; - int32_t vgroupVersion; - int32_t dbGroupVersion; + int32_t vgVersion; } SBuildUseDBInput; @@ -627,8 +626,7 @@ typedef struct { typedef struct { char db[TSDB_TABLE_FNAME_LEN]; int8_t ignoreNotExists; - int32_t vgroupVersion; - int32_t dbGroupVersion; + int32_t vgVersion; int32_t reserve[8]; } SUseDbMsg; @@ -808,6 +806,9 @@ typedef struct SSTableVgroupMsg { typedef struct SVgroupInfo { int32_t vgId; + int32_t hashBegin; + int32_t hashEnd; + int8_t inUse; int8_t numOfEps; SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; } SVgroupInfo; @@ -863,16 +864,12 @@ typedef struct { } STagData; typedef struct { - int32_t vgroupNum; - int32_t vgroupVersion; - char db[TSDB_TABLE_FNAME_LEN]; - int32_t dbVgroupVersion; - int32_t dbVgroupNum; - int32_t dbHashRange; - int32_t dbHashType; + char db[TSDB_FULL_DB_NAME_LEN]; + int32_t vgVersion; + int32_t vgNum; + int8_t hashMethod; SVgroupInfo vgroupInfo[]; -//int32_t vgIdList[]; -} SUseDbRspMsg; +} SUseDbRsp; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 6bbc4f9109..1f2452291b 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -61,22 +61,8 @@ int32_t catalogInit(SCatalogCfg *cfg); */ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); - - -int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); - -/** - * get cluster vgroup list. - * @pVgroupList - hash of vgroup list, key:vgId, value:SVgroupInfo - * @return - */ -int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash); -int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup); - - - int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo); +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo); int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); diff --git a/include/libs/query/query.h b/include/libs/query/query.h index bfe2db6a61..8720fd085c 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "tarray.h" +#include "thash.h" typedef SVgroupListRspMsg SVgroupListInfo; @@ -63,16 +64,14 @@ typedef struct STableMeta { typedef struct SDBVgroupInfo { - int32_t vgroupVersion; - SArray *vgId; - int32_t hashRange; - int32_t hashType; + int32_t vgVersion; + int8_t hashMethod; + SHashObj *vgInfo; //key:vgId, value:SVgroupInfo } SDBVgroupInfo; typedef struct SUseDbOutput { - SVgroupListInfo *vgroupList; - char db[TSDB_TABLE_FNAME_LEN]; - SDBVgroupInfo *dbVgroup; + char db[TSDB_FULL_DB_NAME_LEN]; + SDBVgroupInfo dbVgroup; } SUseDbOutput; typedef struct STableMetaOutput { diff --git a/source/libs/catalog/CMakeLists.txt b/source/libs/catalog/CMakeLists.txt index 25c80d502a..e6311152d6 100644 --- a/source/libs/catalog/CMakeLists.txt +++ b/source/libs/catalog/CMakeLists.txt @@ -10,3 +10,5 @@ target_link_libraries( catalog PRIVATE os util common transport query ) + +ADD_SUBDIRECTORY(test) \ No newline at end of file diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index a670d9d639..9fdac36060 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -20,50 +20,7 @@ SCatalogMgmt ctgMgmt = {0}; -int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SVgroupListInfo** pVgroup) { - char *msg = NULL; - SEpSet *pVnodeEpSet = NULL; - int32_t msgLen = 0; - - int32_t code = queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST](NULL, &msg, 0, &msgLen); - if (code) { - return code; - } - - SRpcMsg rpcMsg = { - .msgType = TSDB_MSG_TYPE_VGROUP_LIST, - .pCont = msg, - .contLen = msgLen, - }; - - SRpcMsg rpcRsp = {0}; - - rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); - - code = queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST](pVgroup, rpcRsp.pCont, rpcRsp.contLen); - if (code) { - return code; - } - - return TSDB_CODE_SUCCESS; -} - -int32_t ctgGetVgroupFromCache(struct SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) { - if (NULL == pCatalog->vgroupCache.cache || pCatalog->vgroupCache.vgroupVersion < 0) { - *exist = 0; - return TSDB_CODE_SUCCESS; - } - - if (pVgroupList) { - *pVgroupList = pCatalog->vgroupCache.cache; - } - - *exist = 1; - - return TSDB_CODE_SUCCESS; -} - -int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { +int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo *dbInfo, int32_t *exist) { if (NULL == pCatalog->dbCache.cache) { *exist = 0; return TSDB_CODE_SUCCESS; @@ -71,28 +28,13 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); - if (NULL == info || info->vgroupVersion < pCatalog->vgroupCache.vgroupVersion) { + if (NULL == info) { *exist = 0; return TSDB_CODE_SUCCESS; } if (dbInfo) { - *dbInfo = calloc(1, sizeof(**dbInfo)); - if (NULL == *dbInfo) { - ctgError("calloc size[%d] failed", (int32_t)sizeof(**dbInfo)); - return TSDB_CODE_CTG_MEM_ERROR; - } - - (*dbInfo)->vgId = taosArrayDup(info->vgId); - if (NULL == (*dbInfo)->vgId) { - ctgError("taos array duplicate failed"); - tfree(*dbInfo); - return TSDB_CODE_CTG_MEM_ERROR; - } - - (*dbInfo)->vgroupVersion = info->vgroupVersion; - (*dbInfo)->hashRange = info->hashRange; - (*dbInfo)->hashType = info->hashType; + *dbInfo = *info; } *exist = 1; @@ -242,8 +184,8 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE } -int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) { - switch (hashType) { +int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { + switch (hashMethod) { default: *fp = MurmurHash3_32; break; @@ -252,96 +194,79 @@ int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) { return TSDB_CODE_SUCCESS; } -int32_t ctgGetVgroupFromVgId(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, int32_t vgId, SVgroupInfo *pVgroup) { +int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray* vgroupList) { SHashObj *vgroupHash = NULL; - - CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash)); - if (NULL == vgroupHash) { - ctgError("get empty vgroup cache"); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } + SVgroupInfo *vgInfo = NULL; - if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) { - ctgError("vgId[%d] not found in vgroup list", vgId); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } + void *pIter = taosHashIterate(dbInfo->vgInfo, NULL); + while (pIter) { + vgInfo = pIter; - return TSDB_CODE_SUCCESS; -} - -int32_t ctgGetVgroupFromVgIdBatch(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SArray* vgIds, SArray* vgroupList) { - SHashObj *vgroupHash = NULL; - SVgroupInfo pVgroup = {0}; - int32_t vgIdNum = taosArrayGetSize(vgIds); - - CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash)); - if (NULL == vgroupHash) { - ctgError("get empty vgroup cache"); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } - - for (int32_t i = 0; i < vgIdNum; ++i) { - int32_t *vgId = taosArrayGet(vgIds, i); + if (NULL == taosArrayPush(vgroupList, vgInfo)) { + ctgError("taosArrayPush failed"); + break; + } - if (NULL == taosHashGetClone(vgroupHash, vgId, sizeof(*vgId), &pVgroup)) { - ctgError("vgId[%d] not found in vgroup list", vgId); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } - - if (NULL == taosArrayPush(vgroupList, &pVgroup)) { - ctgError("push vgroup to array failed, idx:%d", i); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } + pIter = taosHashIterate(dbInfo->vgInfo, pIter); + vgInfo = NULL; } return TSDB_CODE_SUCCESS; } - - -int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { - SDBVgroupInfo *dbInfo = NULL; - int32_t code = 0; - - CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo)); - - if (NULL == dbInfo) { - ctgWarn("db[%s] vgroup info not found", pDBName); +int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { + int32_t vgNum = taosHashGetSize(dbInfo->vgInfo); + if (vgNum <= 0) { + ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum); return TSDB_CODE_TSC_DB_NOT_SELECTED; } - if (dbInfo->vgroupVersion < 0 || NULL == dbInfo->vgId) { - ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgId:%p", pDBName, dbInfo->vgroupVersion, dbInfo->vgId); - CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED); - } - - int32_t vgNum = taosArrayGetSize(dbInfo->vgId); - if (vgNum <= 0) { - ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum); - CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED); - } - tableNameHashFp fp = NULL; + SVgroupInfo *vgInfo = NULL; - CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashType, &fp)); + CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp)); char tbFullName[TSDB_TABLE_FNAME_LEN]; snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName)); - uint32_t hashUnit = dbInfo->hashRange / vgNum; - uint32_t vgId = hashValue / hashUnit; - CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, vgId, pVgroup)); - -_return: - if (dbInfo && dbInfo->vgId) { - taosArrayDestroy(dbInfo->vgId); - dbInfo->vgId = NULL; + void *pIter = taosHashIterate(dbInfo->vgInfo, NULL); + while (pIter) { + vgInfo = pIter; + if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) { + break; + } + + pIter = taosHashIterate(dbInfo->vgInfo, pIter); + vgInfo = NULL; } - - tfree(dbInfo); + + if (NULL == vgInfo) { + ctgError("no hash range found for hashvalue[%u]", hashValue); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + *pVgroup = *vgInfo; + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { + SDBVgroupInfo dbInfo = {0}; + int32_t code = 0; + int32_t vgId = 0; + + CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo)); + + if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { + ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo); + return TSDB_CODE_TSC_DB_NOT_SELECTED; + } + + CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup)); return code; } @@ -524,95 +449,6 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) return TSDB_CODE_SUCCESS; } - -int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) { - if (NULL == pCatalog || NULL == version) { - return TSDB_CODE_CTG_INVALID_INPUT; - } - - *version = pCatalog->vgroupCache.vgroupVersion; - - return TSDB_CODE_SUCCESS; -} - - - -int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) { - if (NULL == pVgroup) { - ctgError("no valid vgroup list info to update"); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } - - if (pVgroup->vgroupVersion < 0) { - ctgError("vgroup version[%d] is invalid", pVgroup->vgroupVersion); - return TSDB_CODE_CTG_INVALID_INPUT; - } - - if (NULL == pCatalog->vgroupCache.cache) { - pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_CACHE_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (NULL == pCatalog->vgroupCache.cache) { - ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_CACHE_VGROUP_NUMBER); - return TSDB_CODE_CTG_MEM_ERROR; - } - } else { - taosHashClear(pCatalog->vgroupCache.cache); - } - - SVgroupInfo *vInfo = NULL; - for (int32_t i = 0; i < pVgroup->vgroupNum; ++i) { - if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &pVgroup->vgroupInfo[i], sizeof(pVgroup->vgroupInfo[i])) != 0) { - ctgError("push to vgroup hash cache failed"); - goto error_exit; - } - } - - pCatalog->vgroupCache.vgroupVersion = pVgroup->vgroupVersion; - - return TSDB_CODE_SUCCESS; - -error_exit: - if (pCatalog->vgroupCache.cache) { - taosHashCleanup(pCatalog->vgroupCache.cache); - pCatalog->vgroupCache.cache = NULL; - } - - pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; - - return TSDB_CODE_CTG_INTERNAL_ERROR; -} - -int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) { - if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) { - return TSDB_CODE_CTG_INVALID_INPUT; - } - - int32_t exist = 0; - - CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist)); - - if (exist) { - return TSDB_CODE_SUCCESS; - } - - SVgroupListInfo *pVgroup = NULL; - - CTG_ERR_RET(ctgGetVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &pVgroup)); - - CTG_ERR_RET(catalogUpdateVgroupCache(pCatalog, pVgroup)); - - if (pVgroupHash) { - CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist)); - } - - if (0 == exist) { - ctgError("catalog fetched but get from cache failed"); - return TSDB_CODE_CTG_INTERNAL_ERROR; - } - - return TSDB_CODE_SUCCESS; -} - - int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { if (NULL == pCatalog || NULL == dbName || NULL == version) { return TSDB_CODE_CTG_INVALID_INPUT; @@ -629,7 +465,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } - *version = dbInfo->vgroupVersion; + *version = dbInfo->vgVersion; return TSDB_CODE_SUCCESS; } @@ -639,7 +475,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName return TSDB_CODE_CTG_INVALID_INPUT; } - if (dbInfo->vgroupVersion < 0) { + if (dbInfo->vgVersion < 0) { if (pCatalog->dbCache.cache) { taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)); } @@ -654,6 +490,12 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); return TSDB_CODE_CTG_MEM_ERROR; } + } else { + SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + if (oldInfo && oldInfo->vgInfo) { + taosHashCleanup(oldInfo->vgInfo); + oldInfo->vgInfo = NULL; + } } if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) { @@ -667,7 +509,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) { +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -688,28 +530,16 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* strncpy(input.db, dbName, sizeof(input.db)); input.db[sizeof(input.db) - 1] = 0; - input.vgroupVersion = pCatalog->vgroupCache.vgroupVersion; - input.dbGroupVersion = CTG_DEFAULT_INVALID_VERSION; + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); - if (DbOut.vgroupList) { - CTG_ERR_JRET(catalogUpdateVgroupCache(pCatalog, DbOut.vgroupList)); - } - - if (DbOut.dbVgroup) { - CTG_ERR_JRET(catalogUpdateDBVgroupCache(pCatalog, dbName, DbOut.dbVgroup)); - } + CTG_ERR_RET(catalogUpdateDBVgroupCache(pCatalog, dbName, &DbOut.dbVgroup)); if (dbInfo) { *dbInfo = DbOut.dbVgroup; - DbOut.dbVgroup = NULL; } -_return: - tfree(DbOut.dbVgroup); - tfree(DbOut.vgroupList); - return code; } @@ -749,16 +579,20 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe STableMeta *tbMeta = NULL; int32_t code = 0; SVgroupInfo vgroupInfo = {0}; - SDBVgroupInfo *dbVgroup = NULL; + SDBVgroupInfo dbVgroup = {0}; CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta)); - if (tbMeta->tableType == TSDB_SUPER_TABLE) { - CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup)); + CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup)); - CTG_ERR_JRET(ctgGetVgroupFromVgIdBatch(pCatalog, pRpc, pMgmtEps, dbVgroup->vgId, pVgroupList)); + if (tbMeta->tableType == TSDB_SUPER_TABLE) { + CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList)); } else { - CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, tbMeta->vgId, &vgroupInfo)); + int32_t vgId = tbMeta->vgId; + if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { + ctgError("vgId[%d] not found in vgroup list", vgId); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) { ctgError("push vgroupInfo to array failed"); @@ -768,12 +602,6 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe _return: tfree(tbMeta); - if (dbVgroup && dbVgroup->vgId) { - taosArrayDestroy(dbVgroup->vgId); - dbVgroup->vgId = NULL; - } - - tfree(dbVgroup); return code; } diff --git a/source/libs/catalog/test/CMakeLists.txt b/source/libs/catalog/test/CMakeLists.txt new file mode 100644 index 0000000000..527156f176 --- /dev/null +++ b/source/libs/catalog/test/CMakeLists.txt @@ -0,0 +1,18 @@ + +MESSAGE(STATUS "build catalog unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +ADD_EXECUTABLE(catalogTest ${SOURCE_LIST}) +TARGET_LINK_LIBRARIES( + catalogTest + PUBLIC os util common catalog transport gtest query +) + +TARGET_INCLUDE_DIRECTORIES( + catalogTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/catalog/" + PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/catalog/inc" +) diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index e69de29bb2..f495451091 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#pragma GCC diagnostic ignored "-Wwrite-strings" + +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#include "os.h" + +#include "taos.h" +#include "tdef.h" +#include "tvariant.h" +#include "catalog.h" + +namespace { + + +} + +TEST(testCase, normalCase) { + char *clusterId = "cluster1"; + struct SCatalog* pCtg = NULL; + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(clusterId, &pCtg); + ASSERT_EQ(code, 0); + + +} + +/* +TEST(testCase, normalCase) { + SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)"); + ASSERT_EQ(info1.valid, true); + + char msg[128] = {0}; + SMsgBuf buf; + buf.len = 128; + buf.buf = msg; + + SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0); + int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); + ASSERT_EQ(code, 0); + + SCatalogReq req = {0}; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + ASSERT_EQ(ret, 0); + ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); + + SQueryStmtInfo* pQueryInfo = createQueryInfo(); + setTableMetaInfo(pQueryInfo, &req); + + SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0); + ret = validateSqlNode(pSqlNode, pQueryInfo, &buf); + ASSERT_EQ(ret, 0); + + SArray* pExprList = pQueryInfo->exprList[0]; + + int32_t num = tsCompatibleModel? 2:1; + ASSERT_EQ(taosArrayGetSize(pExprList), num); + + SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1); + ASSERT_EQ(p1->base.pColumns->uid, 110); + ASSERT_EQ(p1->base.numOfParams, 1); + ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE); + ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)"); + ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP); + ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)"); + ASSERT_EQ(p1->base.interBytes, 16); + + ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE); + ASSERT_STREQ(p1->pExpr->_function.functionName, "top"); + + tExprNode* pParam = p1->pExpr->_function.pChild[0]; + + ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE); + ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3); + ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2); + + struct SQueryPlanNode* n = nullptr; + code = createQueryPlan(pQueryInfo, &n); + + char* str = NULL; + queryPlanToString(n, &str); + printf("%s\n", str); + + destroyQueryInfo(pQueryInfo); + qParserClearupMetaRequestInfo(&req); + destroySqlInfo(&info1); +} + +TEST(testCase, displayPlan) { + generateLogicplan("select count(*) from `t.1abc`"); + generateLogicplan("select count(*)+ 22 from `t.1abc`"); + generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30"); + generateLogicplan("select count(*) from `t.1abc` group by a"); + generateLogicplan("select count(A+B) from `t.1abc` group by a"); + generateLogicplan("select count(length(a)+b) from `t.1abc` group by a"); + generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)"); + generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc "); + generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`"); + generateLogicplan("select count(*), min(a) + 99 from `t.1abc`"); + generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`"); + generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20"); + generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)"); + generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)"); + + // order by + group by column + limit offset + generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1"); + + // fill + generateLogicplan("select min(a) from `t.1abc` where ts>now and tsdb, bInput->db, sizeof(bMsg->db)); bMsg->db[sizeof(bMsg->db) - 1] = 0; - bMsg->vgroupVersion = bInput->vgroupVersion; - bMsg->dbGroupVersion = bInput->dbGroupVersion; + bMsg->vgVersion = bInput->vgVersion; *msgLen = (int32_t)sizeof(*bMsg); @@ -90,58 +78,12 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms } - -int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { - if (NULL == output || NULL == msg || msgSize <= 0) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - - SVgroupListRspMsg *pRsp = (SVgroupListRspMsg *)msg; - - pRsp->vgroupNum = htonl(pRsp->vgroupNum); - pRsp->vgroupVersion = htonl(pRsp->vgroupVersion); - - if (pRsp->vgroupNum < 0) { - qError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum); - return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; - } - - if (pRsp->vgroupVersion < 0) { - qError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion); - return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; - } - - if (msgSize != (pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp))) { - qError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum); - return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; - } - - // keep SVgroupListInfo/SVgroupListRspMsg the same - *(SVgroupListInfo **)output = (SVgroupListInfo *)msg; - - if (pRsp->vgroupNum == 0) { - return TSDB_CODE_SUCCESS; - } - - for (int32_t i = 0; i < pRsp->vgroupNum; ++i) { - pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); - for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { - pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); - } - } - - return TSDB_CODE_SUCCESS; -} - - - - int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { if (NULL == output || NULL == msg || msgSize <= 0) { return TSDB_CODE_TSC_INVALID_INPUT; } - SUseDbRspMsg *pRsp = (SUseDbRspMsg *)msg; + SUseDbRsp *pRsp = (SUseDbRsp *)msg; SUseDbOutput *pOut = (SUseDbOutput *)output; int32_t code = 0; @@ -150,104 +92,52 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } - pRsp->vgroupVersion = htonl(pRsp->vgroupVersion); - pRsp->dbVgroupVersion = htonl(pRsp->dbVgroupVersion); + pRsp->vgVersion = htonl(pRsp->vgVersion); + pRsp->vgNum = htonl(pRsp->vgNum); - pRsp->vgroupNum = htonl(pRsp->vgroupNum); - pRsp->dbVgroupNum = htonl(pRsp->dbVgroupNum); - - if (pRsp->vgroupNum < 0) { - qError("invalid vgroup number[%d]", pRsp->vgroupNum); + if (pRsp->vgNum < 0) { + qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum); return TSDB_CODE_TSC_INVALID_VALUE; } - if (pRsp->dbVgroupNum < 0) { - qError("invalid db vgroup number[%d]", pRsp->dbVgroupNum); - return TSDB_CODE_TSC_INVALID_VALUE; - } - - int32_t expectSize = pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + pRsp->dbVgroupNum * sizeof(int32_t) + sizeof(*pRsp); + int32_t expectSize = pRsp->vgNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp); if (msgSize != expectSize) { - qError("vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d", msgSize, expectSize, pRsp->vgroupNum, pRsp->dbVgroupNum); + qError("use db rsp size mis-match, msgSize:%d, expected:%d, vgnumber:%d", msgSize, expectSize, pRsp->vgNum); return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } - if (pRsp->vgroupVersion < 0) { - qInfo("no new vgroup list info"); - if (pRsp->vgroupNum != 0) { - qError("invalid vgroup number[%d] for no new vgroup list case", pRsp->vgroupNum); - return TSDB_CODE_TSC_INVALID_VALUE; - } - } else { - int32_t s = sizeof(*pOut->vgroupList) + sizeof(pOut->vgroupList->vgroupInfo[0]) * pRsp->vgroupNum; - pOut->vgroupList = calloc(1, s); - if (NULL == pOut->vgroupList) { - qError("calloc size[%d] failed", s); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - pOut->vgroupList->vgroupNum = pRsp->vgroupNum; - pOut->vgroupList->vgroupVersion = pRsp->vgroupVersion; - - for (int32_t i = 0; i < pRsp->vgroupNum; ++i) { - pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); - for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { - pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); - } - - memcpy(&pOut->vgroupList->vgroupInfo[i], &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i])); - } + pOut->dbVgroup.vgVersion = pRsp->vgVersion; + pOut->dbVgroup.hashMethod = pRsp->hashMethod; + pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == pOut->dbVgroup.vgInfo) { + qError("hash init[%d] failed", pRsp->vgNum); + return TSDB_CODE_TSC_OUT_OF_MEMORY; } - int32_t *vgIdList = (int32_t *)((char *)pRsp->vgroupInfo + sizeof(pRsp->vgroupInfo[0]) * pRsp->vgroupNum); + for (int32_t i = 0; i < pRsp->vgNum; ++i) { + pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); + pRsp->vgroupInfo[i].hashBegin = htonl(pRsp->vgroupInfo[i].hashBegin); + pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd); + + for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { + pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); + } + + if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) { + qError("hash push failed"); + goto _return; + } + } memcpy(pOut->db, pRsp->db, sizeof(pOut->db)); - - if (pRsp->dbVgroupVersion < 0) { - qInfo("no new vgroup info for db[%s]", pRsp->db); - } else { - pOut->dbVgroup = calloc(1, sizeof(*pOut->dbVgroup)); - if (NULL == pOut->dbVgroup) { - qError("calloc size[%d] failed", (int32_t)sizeof(*pOut->dbVgroup)); - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _exit; - } - - pOut->dbVgroup->vgId = taosArrayInit(pRsp->dbVgroupNum, sizeof(int32_t)); - if (NULL == pOut->dbVgroup->vgId) { - qError("taosArrayInit size[%d] failed", pRsp->dbVgroupNum); - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _exit; - } - - pOut->dbVgroup->vgroupVersion = pRsp->dbVgroupVersion; - pOut->dbVgroup->hashRange = htonl(pRsp->dbHashRange); - pOut->dbVgroup->hashType = htonl(pRsp->dbHashType); - - if (pOut->dbVgroup->hashRange < 0) { - qError("invalid hashRange[%d] for db[%s]", pOut->dbVgroup->hashRange, pRsp->db); - code = TSDB_CODE_TSC_INVALID_INPUT; - goto _exit; - } - - for (int32_t i = 0; i < pRsp->dbVgroupNum; ++i) { - *(vgIdList + i) = htonl(*(vgIdList + i)); - - taosArrayPush(pOut->dbVgroup->vgId, vgIdList + i) ; - } - } return code; -_exit: - if (pOut->dbVgroup && pOut->dbVgroup->vgId) { - taosArrayDestroy(pOut->dbVgroup->vgId); - pOut->dbVgroup->vgId = NULL; +_return: + if (pOut) { + tfree(pOut->dbVgroup.vgInfo); } - tfree(pOut->dbVgroup); - tfree(pOut->vgroupList); - return code; } @@ -375,11 +265,9 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { void msgInit() { queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg; - queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg; queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg; queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = queryProcessTableMetaRsp; - queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp; queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp; /*