From a352b93570689b954d8514c86678b95d9ce08eb1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 14 Dec 2021 15:24:21 +0800 Subject: [PATCH] update catalog and message --- include/common/taosmsg.h | 19 +- include/common/tmessage.h | 32 +++ include/libs/catalog/catalog.h | 39 ++-- include/util/taoserror.h | 2 +- include/util/tlog.h | 2 + source/client/CMakeLists.txt | 1 + source/client/src/client.c | 3 + source/common/inc/commonInt.h | 16 +- source/common/src/taosmsg.c | 20 ++ source/common/src/tglobal.c | 1 + source/common/src/tmessage.c | 147 ++++++++------ source/dnode/mgmt/impl/src/dndTransport.c | 2 +- source/libs/catalog/inc/catalogInt.h | 26 ++- source/libs/catalog/src/catalog.c | 228 ++++++++++++++++++++-- source/libs/parser/src/astValidate.c | 2 +- source/libs/parser/src/parser.c | 7 +- source/libs/transport/src/rpcMain.c | 2 +- source/util/src/terror.c | 2 +- source/util/src/tlog.c | 2 + 19 files changed, 441 insertions(+), 112 deletions(-) create mode 100644 include/common/tmessage.h create mode 100644 source/common/src/taosmsg.c diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index fd55a11b52..906dd38c51 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -77,7 +77,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_VGROUP_LIST, "vgroup-list" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_STREAM, "kill-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" ) @@ -216,7 +216,6 @@ extern char *taosMsg[]; typedef struct SBuildTableMetaInput { int32_t vgId; - STagData *tagData; char *tableFullName; } SBuildTableMetaInput; @@ -776,8 +775,6 @@ typedef struct { typedef struct { SMsgHead msgHead; char tableFname[TSDB_TABLE_FNAME_LEN]; - int8_t createFlag; - char tags[]; } STableInfoMsg; typedef struct { @@ -792,6 +789,20 @@ typedef struct SSTableVgroupMsg { int32_t numOfTables; } SSTableVgroupMsg, SSTableVgroupRspMsg; +typedef struct SVgroupInfo { + int32_t vgId; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SVgroupInfo; + +typedef struct SVgroupListRspMsg { + int32_t vgroupNum; + int32_t vgroupVersion; + SVgroupInfo vgroupInfo[]; +} SVgroupListRspMsg; + +typedef SVgroupListRspMsg SVgroupListInfo; + typedef struct { int32_t vgId; int8_t numOfEps; diff --git a/include/common/tmessage.h b/include/common/tmessage.h new file mode 100644 index 0000000000..c728ee026e --- /dev/null +++ b/include/common/tmessage.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_COMMON_TMESSAGE_H_ +#define _TD_COMMON_TMESSAGE_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +extern int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); +extern int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); + + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_COMMON_TMESSAGE_H_*/ diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 2092f53ba1..2ceb893ce4 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -30,12 +30,6 @@ extern "C" { struct SCatalog; -typedef struct SVgroupInfo { - int32_t vgId; - int8_t numOfEps; - SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; -} SVgroupInfo; - typedef struct SDBVgroupInfo { int32_t vgroupVersion; SArray *vgId; @@ -91,7 +85,11 @@ typedef struct STableMeta { SSchema schema[]; } STableMeta; -int32_t catalogInit(SCatalog *cfg); +typedef struct SCatalogCfg { + +} SCatalogCfg; + +int32_t catalogInit(SCatalogCfg *cfg); /** * Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side. @@ -101,17 +99,31 @@ int32_t catalogInit(SCatalog *cfg); */ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); -int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); -int32_t catalogUpdateVgroupList(struct SCatalog* pCatalog, int32_t version, SArray* vgroupList); + +int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); +int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList); +int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup); + + int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo); +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); -int32_t catalogGetDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); -int32_t catalogUpdateDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta); +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta); -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta); + +/** + * get table's vgroup list. + * @param clusterId + * @pVgroupList - array of SVgroupInfo + * @return + */ +int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList); /** @@ -125,9 +137,6 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const S */ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SCatalogRsp* pRsp); -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); - -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta); int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a3a4297115..e1a1694fab 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -492,7 +492,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MON_CONNECTION_INVALID TAOS_DEF_ERROR_CODE(0, 0x2300) //"monitor invalid monitor db connection") // catalog -#define TSDB_CODE_CTG_INTERNAL_EROR TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error +#define TSDB_CODE_CTG_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error #define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401) //invalid catalog input parameters #define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready #define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error diff --git a/include/util/tlog.h b/include/util/tlog.h index 2ee60e4324..9b6033e7fe 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -45,6 +45,8 @@ extern int32_t sDebugFlag; extern int32_t tsdbDebugFlag; extern int32_t cqDebugFlag; extern int32_t debugFlag; +extern int32_t ctgDebugFlag; + #define DEBUG_FATAL 1U #define DEBUG_ERROR DEBUG_FATAL diff --git a/source/client/CMakeLists.txt b/source/client/CMakeLists.txt index bc0d439407..bc18e41664 100644 --- a/source/client/CMakeLists.txt +++ b/source/client/CMakeLists.txt @@ -6,5 +6,6 @@ target_include_directories( ) target_link_libraries( taos + PRIVATE common INTERFACE api ) diff --git a/source/client/src/client.c b/source/client/src/client.c index b1663239e6..258fd91565 100644 --- a/source/client/src/client.c +++ b/source/client/src/client.c @@ -18,6 +18,9 @@ //TAOS_RES *taos_query(TAOS *taos, const char *sql) { // //} +#include "taosmsg.h" int taos_init() { return 0; } void taos_cleanup(void) {} + + diff --git a/source/common/inc/commonInt.h b/source/common/inc/commonInt.h index b8be8899f3..c1baa2a453 100644 --- a/source/common/inc/commonInt.h +++ b/source/common/inc/commonInt.h @@ -20,8 +20,22 @@ extern "C" { #endif + +#include "tlog.h" + +extern int32_t cDebugFlag; +extern int8_t tscEmbedded; + +#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) +#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) +#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) +#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) +#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) +#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) +#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) + #ifdef __cplusplus } #endif -#endif /*_TD_COMMON_INT_H_*/ \ No newline at end of file +#endif /*_TD_COMMON_INT_H_*/ diff --git a/source/common/src/taosmsg.c b/source/common/src/taosmsg.c new file mode 100644 index 0000000000..b35e3f1478 --- /dev/null +++ b/source/common/src/taosmsg.c @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define TAOS_MESSAGE_C + +#include "taosmsg.h" + + diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3cf5e52c44..60e8415392 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1715,3 +1715,4 @@ bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *d return true; } + \ No newline at end of file diff --git a/source/common/src/tmessage.c b/source/common/src/tmessage.c index 0e732caa26..07adf1d599 100644 --- a/source/common/src/tmessage.c +++ b/source/common/src/tmessage.c @@ -13,21 +13,103 @@ * along with this program. If not, see . */ -#define TAOS_MESSAGE_C - #include "taosmsg.h" +#include "commonint.h" + int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0}; +int32_t tscBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + *msgLen = 0; + + return TSDB_CODE_SUCCESS; +} + +int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == input || NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input; + + int32_t estimateSize = sizeof(STableInfoMsg); + if (NULL == *msg || msgSize < estimateSize) { + tfree(*msg); + *msg = calloc(1, estimateSize); + if (NULL == *msg) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + STableInfoMsg *bMsg = (STableInfoMsg *)*msg; + + bMsg->msgHead.vgId = bInput->vgId; + + strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); + bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; + + *msgLen = (int32_t)sizeof(*bMsg); + + return TSDB_CODE_SUCCESS; +} + + +int32_t tscProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SVgroupListRspMsg *pRsp = (SVgroupListRspMsg *)msg; + + pRsp->vgroupNum = htonl(pRsp->vgroupNum); + pRsp->vgroupVersion = htonl(pRsp->vgroupVersion); + + if (pRsp->vgroupNum < 0) { + tscError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + + if (pRsp->vgroupVersion < 0) { + tscError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + + if (msgSize != (pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp))) { + tscError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + + // keep SVgroupListInfo/SVgroupListRspMsg the same + *(SVgroupListInfo **)output = (SVgroupListInfo *)msg; + + if (pRsp->vgroupNum == 0) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < pRsp->vgroupNum; ++i) { + pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); + for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { + pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); + } + } + + return TSDB_CODE_SUCCESS; +} + void msgInit() { tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = tscBuildTableMetaReqMsg; + tscBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = tscBuildVgroupListReqMsg; - - tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = ; + //tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp; + tscProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = tscProcessVgroupListRsp; /* tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; @@ -106,63 +188,6 @@ void msgInit() { } -char* msgSerializeTagData(STagData* pTagData, char* pMsg) { - int32_t n = (int32_t) strlen(pTagData->name); - *(int32_t*) pMsg = htonl(n); - pMsg += sizeof(n); - - memcpy(pMsg, pTagData->name, n); - pMsg += n; - - *(int32_t*)pMsg = htonl(pTagData->dataLen); - pMsg += sizeof(int32_t); - - memcpy(pMsg, pTagData->data, pTagData->dataLen); - pMsg += pTagData->dataLen; - - return pMsg; -} - - -int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { - if (NULL == input || NULL == msg || NULL == msgLen) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - - SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input; - - int32_t estimateSize = sizeof(STableInfoMsg) + (bInput->tagData ? (sizeof(*bInput->tagData) + bInput->tagData->dataLen) : 0); - if (NULL == *msg || msgSize < estimateSize) { - tfree(*msg); - *msg = calloc(1, estimateSize); - if (NULL == *msg) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - } - - STableInfoMsg *bMsg = (STableInfoMsg *)*msg; - - bMsg->msgHead.vgId = bInput->vgId; - - strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); - bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; - - int32_t autoCreate = (bInput->tagData && bInput->tagData->dataLen > 0); - - bMsg->createFlag = htons(autoCreate ? 1 : 0); - - char *pMsg = NULL; - - // tag data exists - if (autoCreate) { - pMsg = msgSerializeTagData(bInput->tagData, (char *)bMsg->tags); - } - - *msgLen = (int32_t)(pMsg - (char*)bMsg); - - return TSDB_CODE_SUCCESS; -} - diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index a36c56ea3d..0f8281a3a6 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -72,7 +72,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_VGROUP_LIST] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_STREAM] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 82d7d9c571..720f197782 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -21,13 +21,18 @@ extern "C" { #endif #include "catalog.h" +#include "common.h" +#include "tlog.h" -#define CTG_DEFAULT_CLUSTER_NUMBER 3 +#define CTG_DEFAULT_CLUSTER_NUMBER 6 +#define CTG_DEFAULT_VGROUP_NUMBER 100 + +#define CTG_DEFAULT_INVALID_VERSION (-1) typedef struct SVgroupListCache { - int32_t vgroupNum; int32_t vgroupVersion; - SHashObj *cache; //key:vgId, value:SVgroupInfo + SHashObj *cache; // key:vgId, value:SVgroupInfo* + SArray *arrayCache; // SVgroupInfo } SVgroupListCache; typedef struct SDBVgroupCache { @@ -50,13 +55,16 @@ typedef struct SCatalogMgmt { } SCatalogMgmt; +extern int32_t ctgDebugFlag; + +#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgInfo(...) do { if (ctgDebugFlag & DEBUG_INFO) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgDebug(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) -#define ctgFatal(...) tscFatal(__VA_ARGS__) -#define ctgError(...) tscError(__VA_ARGS__) -#define ctgWarn(...) tscWarn(__VA_ARGS__) -#define ctgInfo(...) tscInfo(__VA_ARGS__) -#define ctgDebug(...) tscDebug(__VA_ARGS__) -#define ctgTrace(...) tscTrace(__VA_ARGS__) #define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) #define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); return _code; } } while (0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index e8b79bae4b..3837e5408a 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -14,53 +14,248 @@ */ #include "catalogInt.h" +#include "trpc.h" +#include "tmessage.h" SCatalogMgmt ctgMgmt = {0}; +int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SVgroupListInfo** pVgroup) { + char *msg = NULL; + SEpSet *pVnodeEpSet = NULL; + int32_t msgLen = 0; -int32_t catalogInit(SCatalog *cfg) { + int32_t code = tscBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST](NULL, &msg, 0, &msgLen); + if (code) { + return code; + } + + SRpcMsg rpcMsg = { + .msgType = TSDB_MSG_TYPE_VGROUP_LIST, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + + code = tscProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST](pVgroup, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + return code; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* exist) { + if (NULL == pCatalog->vgroupCache.arrayCache || pCatalog->vgroupCache.vgroupVersion < 0) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } + + if (pVgroupList) { + *pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache); + } + + *exist = 1; + + return TSDB_CODE_SUCCESS; +} + + + + + +int32_t catalogInit(SCatalogCfg *cfg) { ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == ctgMgmt.pCluster) { - CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_EROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); + CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); } return TSDB_CODE_SUCCESS; } -struct SCatalog* catalogGetHandle(const char *clusterId) { - if (NULL == clusterId) { - return NULL; +int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) { + if (NULL == clusterId || NULL == catalogHandle) { + return TSDB_CODE_CTG_INVALID_INPUT; } if (NULL == ctgMgmt.pCluster) { ctgError("cluster cache are not ready"); - return NULL; + return TSDB_CODE_CTG_NOT_READY; } size_t clen = strlen(clusterId); SCatalog *clusterCtg = (SCatalog *)taosHashGet(ctgMgmt.pCluster, clusterId, clen); if (clusterCtg) { - return clusterCtg; + *catalogHandle = clusterCtg; + return TSDB_CODE_SUCCESS; } clusterCtg = calloc(1, sizeof(*clusterCtg)); if (NULL == clusterCtg) { - ctgError("calloc %d failed", sizeof(*clusterCtg)); - return NULL; + ctgError("calloc %d failed", (int32_t)sizeof(*clusterCtg)); + return TSDB_CODE_CTG_MEM_ERROR; } + clusterCtg->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; + if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) { ctgError("put cluster %s cache to hash failed", clusterId); tfree(clusterCtg); - return NULL; + return TSDB_CODE_CTG_INTERNAL_ERROR; } + + *catalogHandle = clusterCtg; - return clusterCtg; + return TSDB_CODE_SUCCESS; } -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) { + +int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) { + if (NULL == pCatalog || NULL == version) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + *version = pCatalog->vgroupCache.vgroupVersion; + + return TSDB_CODE_SUCCESS; +} + + + +int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) { + if (NULL == pVgroup) { + ctgError("vgroup get from mnode succeed, but no output"); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + if (pVgroup->vgroupVersion < 0) { + ctgError("vgroup version[%d] is invalid", pVgroup->vgroupVersion); + return TSDB_CODE_CTG_INVALID_INPUT; + } + + + if (NULL == pCatalog->vgroupCache.arrayCache) { + pCatalog->vgroupCache.arrayCache = taosArrayInit(pVgroup->vgroupNum, sizeof(pVgroup->vgroupInfo[0])); + if (NULL == pCatalog->vgroupCache.arrayCache) { + ctgError("init array[%d] for cluster cache failed", pVgroup->vgroupNum); + return TSDB_CODE_CTG_MEM_ERROR; + } + } else { + taosArrayClear(pCatalog->vgroupCache.arrayCache); + } + + if (NULL == pCatalog->vgroupCache.cache) { + pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->vgroupCache.cache) { + ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_VGROUP_NUMBER); + return TSDB_CODE_CTG_MEM_ERROR; + } + } else { + taosHashClear(pCatalog->vgroupCache.cache); + } + + SVgroupInfo *vInfo = NULL; + for (int32_t i = 0; i < pVgroup->vgroupNum; ++i) { + vInfo = taosArrayPush(pCatalog->vgroupCache.arrayCache, &pVgroup->vgroupInfo[i]); + if (NULL == vInfo) { + ctgError("push to vgroup array cache failed"); + goto error_exit; + } + + if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &vInfo, POINTER_BYTES) != 0) { + ctgError("push to vgroup hash cache failed"); + goto error_exit; + } + } + + pCatalog->vgroupCache.vgroupVersion = pVgroup->vgroupVersion; + + return TSDB_CODE_SUCCESS; + +error_exit: + if (pCatalog->vgroupCache.arrayCache) { + taosArrayDestroy(pCatalog->vgroupCache.arrayCache); + pCatalog->vgroupCache.arrayCache = NULL; + } + + if (pCatalog->vgroupCache.cache) { + taosHashCleanup(pCatalog->vgroupCache.cache); + pCatalog->vgroupCache.cache = NULL; + } + + pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; + + return TSDB_CODE_CTG_INTERNAL_ERROR; +} + + +int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList) { + if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + int32_t exist = 0; + + CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist)); + + if (exist) { + return TSDB_CODE_SUCCESS; + } + + SVgroupListInfo *pVgroup = NULL; + + CTG_ERR_RET(ctgGetVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &pVgroup)); + + CTG_ERR_RET(catalogUpdateVgroup(pCatalog, pVgroup)); + + if (pVgroupList) { + CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist)); + } + + if (0 == exist) { + ctgError("catalog fetched but get from cache failed"); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { + if (NULL == pCatalog || NULL == dbName || NULL == version) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + if (NULL == pCatalog->dbCache.cache) { + *version = CTG_DEFAULT_INVALID_VERSION; + return TSDB_CODE_SUCCESS; + } + + SDBVgroupInfo * dbInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + if (NULL == dbInfo) { + *version = CTG_DEFAULT_INVALID_VERSION; + return TSDB_CODE_SUCCESS; + } + + *version = dbInfo->vgroupVersion; + + return TSDB_CODE_SUCCESS; +} + +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { + +} + +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { + +} + + + +int32_t catalogGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) { if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -79,12 +274,13 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const S .msgType = TSDB_MSG_TYPE_TABLE_META, .pCont = msg, .contLen = msgLen, - .ahandle = (void*)pSql->self, - .handle = NULL, - .code = 0 }; - rpcSendRequest(pRpcObj->pDnodeConn, pVnodeEpSet, &rpcMsg, &pSql->rpcRid); + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 758b83d820..ce47e50c1e 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -4081,7 +4081,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer SCatalogRsp data = {0}; // TODO: check if the qnode info has been cached already - req.qNodeEpset = true; + req.qNodeRequired = true; code = qParserExtractRequestedMetaInfo(pInfo, &req, msgBuf, msgBufLen); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 3faa06720b..548ab1b8df 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -42,7 +42,12 @@ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo** return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } - struct SCatalog* pCatalog = catalogGetHandle(NULL); + struct SCatalog* pCatalog = NULL; + int32_t code = catalogGetHandle(NULL, &pCatalog); + if (code) { + return code; + } + return qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg, msgLen); } diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 911e8472ab..71e08ea76f 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -406,7 +406,7 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t // for TDengine, all the query, show commands shall have TCP connection char type = pMsg->msgType; if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_SHOW_RETRIEVE - || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_STABLE_VGROUP + || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_VGROUP_LIST || type == TSDB_MSG_TYPE_TABLES_META || type == TSDB_MSG_TYPE_TABLE_META || type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_STATUS || type == TSDB_MSG_TYPE_ALTER_TABLE) pContext->connType = RPC_CONN_TCPC; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a75ce747b2..db94d26518 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -498,7 +498,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level") TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk") // catalog -TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_EROR, "catalog interval error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog interval error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index e6cc3a53af..23b66af0c8 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -99,6 +99,8 @@ int32_t wDebugFlag = 135; int32_t tsdbDebugFlag = 131; int32_t cqDebugFlag = 131; int32_t fsDebugFlag = 135; +int32_t ctgDebugFlag = 131; + int64_t dbgEmptyW = 0; int64_t dbgWN = 0;