From d962a2715f70cc3e1022586d31b14ef936a3c192 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 11 Dec 2021 11:18:15 +0800 Subject: [PATCH 1/4] catalog init version --- include/libs/catalog/catalog.h | 33 ++++++++++++++++------- source/libs/catalog/inc/catalogInt.h | 8 ++++-- source/libs/catalog/src/catalog.c | 12 +++++++-- source/libs/parser/inc/parserInt.h | 4 +-- source/libs/parser/src/astValidate.c | 6 ++--- source/libs/parser/src/parser.c | 6 ++--- source/libs/parser/test/parserTests.cpp | 24 ++++++++--------- source/libs/parser/test/plannerTest.cpp | 6 ++--- source/libs/parser/test/tokenizerTest.cpp | 2 +- 9 files changed, 63 insertions(+), 38 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 050b9c904f..b04b4f5c8d 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -30,19 +30,19 @@ extern "C" { struct SCatalog; -typedef struct SMetaReq { +typedef struct SCatalogReq { char clusterId[TSDB_CLUSTER_ID_LEN]; SArray *pTableName; // table full name SArray *pUdf; // udf name bool qNodeEpset; // valid qnode -} SMetaReq; +} SCatalogReq; -typedef struct SMetaData { +typedef struct SCatalogRsp { SArray *pTableMeta; // tableMeta SArray *pVgroupInfo; // vgroupInfo list SArray *pUdfList; // udf info list SEpSet *pEpSet; // qnode epset list -} SMetaData; +} SCatalogRsp; typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema @@ -78,32 +78,45 @@ typedef struct STableMeta { SSchema schema[]; } STableMeta; +typedef struct SCatalogCfg { + +} SCatalogCfg; + + +int32_t catalogInit(SCatalog *cfg); + /** * Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side. * There is ONLY one SCatalog object for one process space, and this function returns a singleton. - * @param pMgmtEps + * @param clusterId * @return */ -struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps); +struct SCatalog* catalogGetHandle(const char *clusterId); /** * Get the required meta data from mnode. * Note that this is a synchronized API and is also thread-safety. * @param pCatalog + * @param pMgmtEps * @param pMetaReq * @param pMetaData * @return */ -int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData); +int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pCatalogReq, SCatalogRsp* pCatalogData); + +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); + +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, SCatalogRsp* pCatalogData); + /** - * Destroy catalog service handle + * Destroy catalog and relase all resources * @param pCatalog */ -void destroyCatalog(struct SCatalog* pCatalog); +void catalogDestroy(void); #ifdef __cplusplus } #endif -#endif /*_TD_CATALOG_H_*/ \ No newline at end of file +#endif /*_TD_CATALOG_H_*/ diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 5b50bbff4c..60cc1771e2 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -23,10 +23,14 @@ extern "C" { #include "catalog.h" typedef struct SCatalog { - void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata - SHashObj *pData; // items cached for each cluster, the hash key is the cluster-id, returned by mgmt node + } SCatalog; +typedef struct SCatalogMgmt { + void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata + SHashObj *pMeta; // items cached for each cluster, the hash key is the cluster-id, returned by mgmt node +} SCatalogMgmt; + #ifdef __cplusplus } #endif diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 08e2172adb..cd6e357f43 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -15,10 +15,18 @@ #include "catalogInt.h" -struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps) { +SCatalogMgmt ctgMgmt = {0}; + + +int32_t catalogInit(SCatalog *cfg) { + ctgMgmt = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); +} + + +struct SCatalog* catalogGetHandle(const char *clusterId) { return (struct SCatalog*) 0x1; } -int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) { +int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pMetaReq, SCatalogRsp* pMetaData) { return 0; } diff --git a/source/libs/parser/inc/parserInt.h b/source/libs/parser/inc/parserInt.h index ca02165382..e040c21879 100644 --- a/source/libs/parser/inc/parserInt.h +++ b/source/libs/parser/inc/parserInt.h @@ -87,13 +87,13 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf); * @param msgBufLen * @return */ -int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMetaInfo, char* msg, int32_t msgBufLen); +int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen); /** * Destroy the meta data request structure. * @param pMetaInfo */ -void qParserClearupMetaRequestInfo(SMetaReq* pMetaInfo); +void qParserClearupMetaRequestInfo(SCatalogReq* pMetaInfo); #ifdef __cplusplus } diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 7b6c423fb6..758b83d820 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -4077,8 +4077,8 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer } #endif - SMetaReq req = {0}; - SMetaData data = {0}; + SCatalogReq req = {0}; + SCatalogRsp data = {0}; // TODO: check if the qnode info has been cached already req.qNodeEpset = true; @@ -4088,7 +4088,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer } // load the meta data from catalog - code = catalogGetMetaData(pCatalog, &req, &data); + code = catalogGetAllMeta(pCatalog, NULL, &req, &data); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 29561f7f54..3faa06720b 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -42,7 +42,7 @@ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo** return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } - struct SCatalog* pCatalog = getCatalogHandle(NULL); + struct SCatalog* pCatalog = catalogGetHandle(NULL); return qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg, msgLen); } @@ -131,7 +131,7 @@ static void freePtrElem(void* p) { tfree(*(char**)p); } -int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMetaInfo, char* msg, int32_t msgBufLen) { +int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen) { int32_t code = TSDB_CODE_SUCCESS; SMsgBuf msgBuf = {.buf = msg, .len = msgBufLen}; @@ -188,7 +188,7 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMet return code; } -void qParserClearupMetaRequestInfo(SMetaReq* pMetaReq) { +void qParserClearupMetaRequestInfo(SCatalogReq* pMetaReq) { if (pMetaReq == NULL) { return; } diff --git a/source/libs/parser/test/parserTests.cpp b/source/libs/parser/test/parserTests.cpp index 2193a44604..6a402cd620 100644 --- a/source/libs/parser/test/parserTests.cpp +++ b/source/libs/parser/test/parserTests.cpp @@ -38,7 +38,7 @@ void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_ strcpy(p->name, name); } -void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq* req) { +void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SCatalogReq* req) { pQueryInfo->numOfTables = 1; pQueryInfo->pTableMetaInfo = (STableMetaInfo**)calloc(1, POINTER_BYTES); @@ -80,7 +80,7 @@ void sqlCheck(const char* sql, bool valid) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -117,7 +117,7 @@ TEST(testCase, validateAST_test) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -175,7 +175,7 @@ TEST(testCase, function_Test) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -221,7 +221,7 @@ TEST(testCase, function_Test2) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -267,7 +267,7 @@ TEST(testCase, function_Test3) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -312,7 +312,7 @@ TEST(testCase, function_Test4) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -360,7 +360,7 @@ TEST(testCase, function_Test5) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -445,7 +445,7 @@ TEST(testCase, function_Test6) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -523,7 +523,7 @@ TEST(testCase, function_Test6) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -585,7 +585,7 @@ TEST(testCase, function_Test6) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -664,7 +664,7 @@ TEST(testCase, function_Test6) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); diff --git a/source/libs/parser/test/plannerTest.cpp b/source/libs/parser/test/plannerTest.cpp index c86e687664..bb9271a3c8 100644 --- a/source/libs/parser/test/plannerTest.cpp +++ b/source/libs/parser/test/plannerTest.cpp @@ -39,7 +39,7 @@ void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_ strcpy(p->name, name); } -void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) { +void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SCatalogReq *req) { pQueryInfo->numOfTables = 1; pQueryInfo->pTableMetaInfo = (STableMetaInfo**)calloc(1, POINTER_BYTES); @@ -79,7 +79,7 @@ void generateLogicplan(const char* sql) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -119,7 +119,7 @@ TEST(testCase, planner_test) { int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); diff --git a/source/libs/parser/test/tokenizerTest.cpp b/source/libs/parser/test/tokenizerTest.cpp index 07ba46427f..54372ae133 100644 --- a/source/libs/parser/test/tokenizerTest.cpp +++ b/source/libs/parser/test/tokenizerTest.cpp @@ -714,7 +714,7 @@ TEST(testCase, extractMeta_test) { ASSERT_EQ(info1.valid, true); char msg[128] = {0}; - SMetaReq req = {0}; + SCatalogReq req = {0}; int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); From 3d5f9a244dafd4e1eca20fa1da5a63b14196c8bb Mon Sep 17 00:00:00 2001 From: dapan Date: Mon, 13 Dec 2021 08:10:06 +0800 Subject: [PATCH 2/4] catalog init --- include/libs/catalog/catalog.h | 14 +++-- include/util/taoserror.h | 7 +++ source/common/src/tmessage.c | 89 ++++++++++++++++++++++++++++ source/libs/catalog/inc/catalogInt.h | 20 ++++++- source/libs/catalog/src/catalog.c | 63 +++++++++++++++++++- source/util/src/terror.c | 11 +++- 6 files changed, 194 insertions(+), 10 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index b04b4f5c8d..8aacede5fe 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -31,10 +31,10 @@ extern "C" { struct SCatalog; typedef struct SCatalogReq { - char clusterId[TSDB_CLUSTER_ID_LEN]; + char clusterId[TSDB_CLUSTER_ID_LEN]; //???? SArray *pTableName; // table full name SArray *pUdf; // udf name - bool qNodeEpset; // valid qnode + bool qNodeRequired; // valid qnode } SCatalogReq; typedef struct SCatalogRsp { @@ -93,6 +93,9 @@ int32_t catalogInit(SCatalog *cfg); */ struct SCatalog* catalogGetHandle(const char *clusterId); +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta); + + /** * Get the required meta data from mnode. * Note that this is a synchronized API and is also thread-safety. @@ -102,11 +105,14 @@ struct SCatalog* catalogGetHandle(const char *clusterId); * @param pMetaData * @return */ -int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pCatalogReq, SCatalogRsp* pCatalogData); +int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SCatalogRsp* pRsp); int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, SCatalogRsp* pCatalogData); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta); + +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); + /** diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 36301466f8..065f1ee0ab 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -490,6 +490,13 @@ int32_t* taosGetErrno(); // monitor #define TSDB_CODE_MON_CONNECTION_INVALID TAOS_DEF_ERROR_CODE(0, 0x2300) //"monitor invalid monitor db connection") +// catalog +#define TSDB_CODE_CTG_INTERNAL_EROR TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error +#define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401) //invalid catalog input parameters +#define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready +#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error +#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error + #ifdef __cplusplus } #endif diff --git a/source/common/src/tmessage.c b/source/common/src/tmessage.c index 0b6dbfdb51..8609e8f09a 100644 --- a/source/common/src/tmessage.c +++ b/source/common/src/tmessage.c @@ -16,3 +16,92 @@ #define TAOS_MESSAGE_C #include "taosmsg.h" + +int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize) = {0}; + +int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0}; + + +void msgInit() { + tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = buildTableMetaReqMsg; + + tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = ; + +/* + tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; + tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg; + tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg; + + tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; + tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; + tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg; + + tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg; + tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg; + + tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg; + tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg; + tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg; + tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg; + tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg; + tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg; + tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg; + tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg; + tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg; + tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg; + tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg; + tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; + tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg; + tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; + tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg; + + tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; + tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg; + tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg; + tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg; + + tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg; + tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg; + tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg; + tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg; + tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg; + tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg; + + tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp; + tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode; + + tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp; + tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp; + tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp; + tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp; + tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp; + tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp; + tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp; + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp; + + tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp; + tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. + tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp; + + tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp; + tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp; + tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp; + tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessLocalRetrieveRsp; + tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessLocalRetrieveRsp; + + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp; + + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp; + + tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp; + tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp; + tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp; + + tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp; + tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp; + tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp; +*/ +} + + + diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 60cc1771e2..8703f1f0ce 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -22,17 +22,33 @@ extern "C" { #include "catalog.h" +#define CTG_DEFAULT_CLUSTER_NUMBER 3 + typedef struct SCatalog { } SCatalog; typedef struct SCatalogMgmt { void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata - SHashObj *pMeta; // items cached for each cluster, the hash key is the cluster-id, returned by mgmt node + SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node } SCatalogMgmt; + + +#define ctgFatal(...) tscFatal(__VA_ARGS__) +#define ctgError(...) tscError(__VA_ARGS__) +#define ctgWarn(...) tscWarn(__VA_ARGS__) +#define ctgInfo(...) tscInfo(__VA_ARGS__) +#define ctgDebug(...) tscDebug(__VA_ARGS__) +#define ctgTrace(...) tscTrace(__VA_ARGS__) + +#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) +#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); return _code; } } while (0) +#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0) + + #ifdef __cplusplus } #endif -#endif /*_TD_CATALOG_INT_H_*/ \ No newline at end of file +#endif /*_TD_CATALOG_INT_H_*/ diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index cd6e357f43..a305df05d0 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -19,14 +19,71 @@ SCatalogMgmt ctgMgmt = {0}; int32_t catalogInit(SCatalog *cfg) { - ctgMgmt = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == ctgMgmt.pCluster) { + CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_EROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); + } + + ctgGetVnodeInfo(); + + return TSDB_CODE_SUCCESS; } struct SCatalog* catalogGetHandle(const char *clusterId) { - return (struct SCatalog*) 0x1; + if (NULL == clusterId) { + return NULL; + } + + if (NULL == ctgMgmt.pCluster) { + ctgError("cluster cache are not ready"); + return NULL; + } + + size_t clen = strlen(clusterId); + SCatalog *clusterCtg = (SCatalog *)taosHashGet(ctgMgmt.pCluster, clusterId, clen); + + if (clusterCtg) { + return clusterCtg; + } + + clusterCtg = calloc(1, sizeof(*clusterCtg)); + if (NULL == clusterCtg) { + ctgError("calloc %d failed", sizeof(*clusterCtg)); + return NULL; + } + + if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) { + ctgError("put cluster %s cache to hash failed", clusterId); + tfree(clusterCtg); + return NULL; + } + + return clusterCtg; } -int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pMetaReq, SCatalogRsp* pMetaData) { +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) { + if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + +} + + +int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SCatalogRsp* pRsp) { + if (NULL == pCatalog || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + return 0; } + +void catalogDestroy(void) { + if (ctgMgmt.pCluster) { + taosHashCleanup(ctgMgmt.pCluster); //TBD + ctgMgmt.pCluster = NULL; + } +} + + + diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 5110c8ba22..c4ca44f1d2 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -496,6 +496,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_FILE_ALREADY_EXISTS, "tfs file already exis TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level") TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk") +// catalog +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_EROR, "catalog interval error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error") + + + #ifdef TAOS_ERROR_C }; #endif @@ -544,4 +553,4 @@ const char* tstrerror(int32_t err) { return ""; } -const char* terrstr() { return tstrerror(terrno); } \ No newline at end of file +const char* terrstr() { return tstrerror(terrno); } From 5c7332c2fcfd2989415a3674f1b046856d620423 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 13 Dec 2021 17:10:31 +0800 Subject: [PATCH 3/4] catalog update --- include/common/taosmsg.h | 7 +++ include/libs/catalog/catalog.h | 36 +++++++++++---- include/util/taoserror.h | 1 + source/common/src/tmessage.c | 67 ++++++++++++++++++++++++++-- source/libs/catalog/inc/catalogInt.h | 18 +++++++- source/libs/catalog/src/catalog.c | 24 ++++++++-- source/libs/parser/src/parserUtil.c | 17 ------- source/util/src/terror.c | 1 + 8 files changed, 138 insertions(+), 33 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 2ce6da9806..fd55a11b52 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -214,6 +214,12 @@ typedef enum _mgmt_table { extern char *taosMsg[]; +typedef struct SBuildTableMetaInput { + int32_t vgId; + STagData *tagData; + char *tableFullName; +} SBuildTableMetaInput; + #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta @@ -768,6 +774,7 @@ typedef struct { } SStableInfoMsg; typedef struct { + SMsgHead msgHead; char tableFname[TSDB_TABLE_FNAME_LEN]; int8_t createFlag; char tags[]; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 8aacede5fe..2092f53ba1 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -30,6 +30,19 @@ extern "C" { struct SCatalog; +typedef struct SVgroupInfo { + int32_t vgId; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SVgroupInfo; + +typedef struct SDBVgroupInfo { + int32_t vgroupVersion; + SArray *vgId; + int32_t hashRange; + int32_t hashNum; +} SDBVgroupInfo; + typedef struct SCatalogReq { char clusterId[TSDB_CLUSTER_ID_LEN]; //???? SArray *pTableName; // table full name @@ -38,8 +51,8 @@ typedef struct SCatalogReq { } SCatalogReq; typedef struct SCatalogRsp { - SArray *pTableMeta; // tableMeta - SArray *pVgroupInfo; // vgroupInfo list + SArray *pTableMeta; // STableMeta array + SArray *pVgroupInfo; // SVgroupInfo list SArray *pUdfList; // udf info list SEpSet *pEpSet; // qnode epset list } SCatalogRsp; @@ -78,11 +91,6 @@ typedef struct STableMeta { SSchema schema[]; } STableMeta; -typedef struct SCatalogCfg { - -} SCatalogCfg; - - int32_t catalogInit(SCatalog *cfg); /** @@ -91,9 +99,19 @@ int32_t catalogInit(SCatalog *cfg); * @param clusterId * @return */ -struct SCatalog* catalogGetHandle(const char *clusterId); +int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta); +int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); + +int32_t catalogUpdateVgroupList(struct SCatalog* pCatalog, int32_t version, SArray* vgroupList); + +int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); + +int32_t catalogGetDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); + +int32_t catalogUpdateDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); + +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta); /** diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 065f1ee0ab..a3a4297115 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -117,6 +117,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_INVALID_JSON TAOS_DEF_ERROR_CODE(0, 0x0221) //"Invalid JSON format") #define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222) //"Invalid JSON data type") #define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) //"Value out of range") +#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input") // mnode #define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) diff --git a/source/common/src/tmessage.c b/source/common/src/tmessage.c index 8609e8f09a..0e732caa26 100644 --- a/source/common/src/tmessage.c +++ b/source/common/src/tmessage.c @@ -17,14 +17,16 @@ #include "taosmsg.h" -int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize) = {0}; +int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0}; void msgInit() { - tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = buildTableMetaReqMsg; - + tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = tscBuildTableMetaReqMsg; + + + tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = ; /* @@ -104,4 +106,63 @@ void msgInit() { } +char* msgSerializeTagData(STagData* pTagData, char* pMsg) { + int32_t n = (int32_t) strlen(pTagData->name); + *(int32_t*) pMsg = htonl(n); + pMsg += sizeof(n); + + memcpy(pMsg, pTagData->name, n); + pMsg += n; + + *(int32_t*)pMsg = htonl(pTagData->dataLen); + pMsg += sizeof(int32_t); + + memcpy(pMsg, pTagData->data, pTagData->dataLen); + pMsg += pTagData->dataLen; + + return pMsg; +} + + +int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == input || NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input; + + int32_t estimateSize = sizeof(STableInfoMsg) + (bInput->tagData ? (sizeof(*bInput->tagData) + bInput->tagData->dataLen) : 0); + if (NULL == *msg || msgSize < estimateSize) { + tfree(*msg); + *msg = calloc(1, estimateSize); + if (NULL == *msg) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + STableInfoMsg *bMsg = (STableInfoMsg *)*msg; + + bMsg->msgHead.vgId = bInput->vgId; + + strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); + bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; + + int32_t autoCreate = (bInput->tagData && bInput->tagData->dataLen > 0); + + bMsg->createFlag = htons(autoCreate ? 1 : 0); + + char *pMsg = NULL; + + // tag data exists + if (autoCreate) { + pMsg = msgSerializeTagData(bInput->tagData, (char *)bMsg->tags); + } + + *msgLen = (int32_t)(pMsg - (char*)bMsg); + + return TSDB_CODE_SUCCESS; +} + + + diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 8703f1f0ce..82d7d9c571 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -24,8 +24,24 @@ extern "C" { #define CTG_DEFAULT_CLUSTER_NUMBER 3 -typedef struct SCatalog { +typedef struct SVgroupListCache { + int32_t vgroupNum; + int32_t vgroupVersion; + SHashObj *cache; //key:vgId, value:SVgroupInfo +} SVgroupListCache; +typedef struct SDBVgroupCache { + SHashObj *cache; //key:dbname, value:SDBVgroupInfo +} SDBVgroupCache; + +typedef struct STableMetaCache { + SHashObj *cache; //key:fulltablename, value:STableMeta +} STableMetaCache; + +typedef struct SCatalog { + SVgroupListCache vgroupCache; + SDBVgroupCache dbCache; + STableMetaCache tableCache; } SCatalog; typedef struct SCatalogMgmt { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index a305df05d0..e8b79bae4b 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -24,8 +24,6 @@ int32_t catalogInit(SCatalog *cfg) { CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_EROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); } - ctgGetVnodeInfo(); - return TSDB_CODE_SUCCESS; } @@ -62,11 +60,31 @@ struct SCatalog* catalogGetHandle(const char *clusterId) { return clusterCtg; } -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) { +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) { if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { return TSDB_CODE_CTG_INVALID_INPUT; } + SBuildTableMetaInput bInput = {0}; + char *msg = NULL; + SEpSet *pVnodeEpSet = NULL; + int32_t msgLen = 0; + + int32_t code = tscBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); + if (code) { + return code; + } + + SRpcMsg rpcMsg = { + .msgType = TSDB_MSG_TYPE_TABLE_META, + .pCont = msg, + .contLen = msgLen, + .ahandle = (void*)pSql->self, + .handle = NULL, + .code = 0 + }; + + rpcSendRequest(pRpcObj->pDnodeConn, pVnodeEpSet, &rpcMsg, &pSql->rpcRid); } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 3e83381a76..f154599aae 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1433,23 +1433,6 @@ void* vgroupInfoClear(SVgroupsInfo *vgroupList) { return NULL; } -char* serializeTagData(STagData* pTagData, char* pMsg) { - int32_t n = (int32_t) strlen(pTagData->name); - *(int32_t*) pMsg = htonl(n); - pMsg += sizeof(n); - - memcpy(pMsg, pTagData->name, n); - pMsg += n; - - *(int32_t*)pMsg = htonl(pTagData->dataLen); - pMsg += sizeof(int32_t); - - memcpy(pMsg, pTagData->data, pTagData->dataLen); - pMsg += pTagData->dataLen; - - return pMsg; -} - int32_t copyTagData(STagData* dst, const STagData* src) { dst->dataLen = src->dataLen; tstrncpy(dst->name, src->name, tListLen(dst->name)); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c4ca44f1d2..a75ce747b2 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -127,6 +127,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DUP_TAG_NAMES, "duplicated tag names" TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input") // mnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed") From a352b93570689b954d8514c86678b95d9ce08eb1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 14 Dec 2021 15:24:21 +0800 Subject: [PATCH 4/4] update catalog and message --- include/common/taosmsg.h | 19 +- include/common/tmessage.h | 32 +++ include/libs/catalog/catalog.h | 39 ++-- include/util/taoserror.h | 2 +- include/util/tlog.h | 2 + source/client/CMakeLists.txt | 1 + source/client/src/client.c | 3 + source/common/inc/commonInt.h | 16 +- source/common/src/taosmsg.c | 20 ++ source/common/src/tglobal.c | 1 + source/common/src/tmessage.c | 147 ++++++++------ source/dnode/mgmt/impl/src/dndTransport.c | 2 +- source/libs/catalog/inc/catalogInt.h | 26 ++- source/libs/catalog/src/catalog.c | 228 ++++++++++++++++++++-- source/libs/parser/src/astValidate.c | 2 +- source/libs/parser/src/parser.c | 7 +- source/libs/transport/src/rpcMain.c | 2 +- source/util/src/terror.c | 2 +- source/util/src/tlog.c | 2 + 19 files changed, 441 insertions(+), 112 deletions(-) create mode 100644 include/common/tmessage.h create mode 100644 source/common/src/taosmsg.c diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index fd55a11b52..906dd38c51 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -77,7 +77,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_VGROUP_LIST, "vgroup-list" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_STREAM, "kill-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" ) @@ -216,7 +216,6 @@ extern char *taosMsg[]; typedef struct SBuildTableMetaInput { int32_t vgId; - STagData *tagData; char *tableFullName; } SBuildTableMetaInput; @@ -776,8 +775,6 @@ typedef struct { typedef struct { SMsgHead msgHead; char tableFname[TSDB_TABLE_FNAME_LEN]; - int8_t createFlag; - char tags[]; } STableInfoMsg; typedef struct { @@ -792,6 +789,20 @@ typedef struct SSTableVgroupMsg { int32_t numOfTables; } SSTableVgroupMsg, SSTableVgroupRspMsg; +typedef struct SVgroupInfo { + int32_t vgId; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SVgroupInfo; + +typedef struct SVgroupListRspMsg { + int32_t vgroupNum; + int32_t vgroupVersion; + SVgroupInfo vgroupInfo[]; +} SVgroupListRspMsg; + +typedef SVgroupListRspMsg SVgroupListInfo; + typedef struct { int32_t vgId; int8_t numOfEps; diff --git a/include/common/tmessage.h b/include/common/tmessage.h new file mode 100644 index 0000000000..c728ee026e --- /dev/null +++ b/include/common/tmessage.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_COMMON_TMESSAGE_H_ +#define _TD_COMMON_TMESSAGE_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +extern int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); +extern int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); + + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_COMMON_TMESSAGE_H_*/ diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 2092f53ba1..2ceb893ce4 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -30,12 +30,6 @@ extern "C" { struct SCatalog; -typedef struct SVgroupInfo { - int32_t vgId; - int8_t numOfEps; - SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; -} SVgroupInfo; - typedef struct SDBVgroupInfo { int32_t vgroupVersion; SArray *vgId; @@ -91,7 +85,11 @@ typedef struct STableMeta { SSchema schema[]; } STableMeta; -int32_t catalogInit(SCatalog *cfg); +typedef struct SCatalogCfg { + +} SCatalogCfg; + +int32_t catalogInit(SCatalogCfg *cfg); /** * Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side. @@ -101,17 +99,31 @@ int32_t catalogInit(SCatalog *cfg); */ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); -int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); -int32_t catalogUpdateVgroupList(struct SCatalog* pCatalog, int32_t version, SArray* vgroupList); + +int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); +int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList); +int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup); + + int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo); +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); -int32_t catalogGetDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); -int32_t catalogUpdateDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta); +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta); -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta); + +/** + * get table's vgroup list. + * @param clusterId + * @pVgroupList - array of SVgroupInfo + * @return + */ +int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList); /** @@ -125,9 +137,6 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const S */ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SCatalogRsp* pRsp); -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); - -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta); int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a3a4297115..e1a1694fab 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -492,7 +492,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MON_CONNECTION_INVALID TAOS_DEF_ERROR_CODE(0, 0x2300) //"monitor invalid monitor db connection") // catalog -#define TSDB_CODE_CTG_INTERNAL_EROR TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error +#define TSDB_CODE_CTG_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error #define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401) //invalid catalog input parameters #define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready #define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error diff --git a/include/util/tlog.h b/include/util/tlog.h index 2ee60e4324..9b6033e7fe 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -45,6 +45,8 @@ extern int32_t sDebugFlag; extern int32_t tsdbDebugFlag; extern int32_t cqDebugFlag; extern int32_t debugFlag; +extern int32_t ctgDebugFlag; + #define DEBUG_FATAL 1U #define DEBUG_ERROR DEBUG_FATAL diff --git a/source/client/CMakeLists.txt b/source/client/CMakeLists.txt index bc0d439407..bc18e41664 100644 --- a/source/client/CMakeLists.txt +++ b/source/client/CMakeLists.txt @@ -6,5 +6,6 @@ target_include_directories( ) target_link_libraries( taos + PRIVATE common INTERFACE api ) diff --git a/source/client/src/client.c b/source/client/src/client.c index b1663239e6..258fd91565 100644 --- a/source/client/src/client.c +++ b/source/client/src/client.c @@ -18,6 +18,9 @@ //TAOS_RES *taos_query(TAOS *taos, const char *sql) { // //} +#include "taosmsg.h" int taos_init() { return 0; } void taos_cleanup(void) {} + + diff --git a/source/common/inc/commonInt.h b/source/common/inc/commonInt.h index b8be8899f3..c1baa2a453 100644 --- a/source/common/inc/commonInt.h +++ b/source/common/inc/commonInt.h @@ -20,8 +20,22 @@ extern "C" { #endif + +#include "tlog.h" + +extern int32_t cDebugFlag; +extern int8_t tscEmbedded; + +#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) +#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) +#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) +#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) +#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) +#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) +#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) + #ifdef __cplusplus } #endif -#endif /*_TD_COMMON_INT_H_*/ \ No newline at end of file +#endif /*_TD_COMMON_INT_H_*/ diff --git a/source/common/src/taosmsg.c b/source/common/src/taosmsg.c new file mode 100644 index 0000000000..b35e3f1478 --- /dev/null +++ b/source/common/src/taosmsg.c @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define TAOS_MESSAGE_C + +#include "taosmsg.h" + + diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3cf5e52c44..60e8415392 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1715,3 +1715,4 @@ bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *d return true; } + \ No newline at end of file diff --git a/source/common/src/tmessage.c b/source/common/src/tmessage.c index 0e732caa26..07adf1d599 100644 --- a/source/common/src/tmessage.c +++ b/source/common/src/tmessage.c @@ -13,21 +13,103 @@ * along with this program. If not, see . */ -#define TAOS_MESSAGE_C - #include "taosmsg.h" +#include "commonint.h" + int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0}; +int32_t tscBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + *msgLen = 0; + + return TSDB_CODE_SUCCESS; +} + +int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == input || NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input; + + int32_t estimateSize = sizeof(STableInfoMsg); + if (NULL == *msg || msgSize < estimateSize) { + tfree(*msg); + *msg = calloc(1, estimateSize); + if (NULL == *msg) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + STableInfoMsg *bMsg = (STableInfoMsg *)*msg; + + bMsg->msgHead.vgId = bInput->vgId; + + strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); + bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; + + *msgLen = (int32_t)sizeof(*bMsg); + + return TSDB_CODE_SUCCESS; +} + + +int32_t tscProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SVgroupListRspMsg *pRsp = (SVgroupListRspMsg *)msg; + + pRsp->vgroupNum = htonl(pRsp->vgroupNum); + pRsp->vgroupVersion = htonl(pRsp->vgroupVersion); + + if (pRsp->vgroupNum < 0) { + tscError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + + if (pRsp->vgroupVersion < 0) { + tscError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + + if (msgSize != (pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp))) { + tscError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + + // keep SVgroupListInfo/SVgroupListRspMsg the same + *(SVgroupListInfo **)output = (SVgroupListInfo *)msg; + + if (pRsp->vgroupNum == 0) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < pRsp->vgroupNum; ++i) { + pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); + for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { + pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); + } + } + + return TSDB_CODE_SUCCESS; +} + void msgInit() { tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = tscBuildTableMetaReqMsg; + tscBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = tscBuildVgroupListReqMsg; - - tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = ; + //tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp; + tscProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = tscProcessVgroupListRsp; /* tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; @@ -106,63 +188,6 @@ void msgInit() { } -char* msgSerializeTagData(STagData* pTagData, char* pMsg) { - int32_t n = (int32_t) strlen(pTagData->name); - *(int32_t*) pMsg = htonl(n); - pMsg += sizeof(n); - - memcpy(pMsg, pTagData->name, n); - pMsg += n; - - *(int32_t*)pMsg = htonl(pTagData->dataLen); - pMsg += sizeof(int32_t); - - memcpy(pMsg, pTagData->data, pTagData->dataLen); - pMsg += pTagData->dataLen; - - return pMsg; -} - - -int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { - if (NULL == input || NULL == msg || NULL == msgLen) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - - SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input; - - int32_t estimateSize = sizeof(STableInfoMsg) + (bInput->tagData ? (sizeof(*bInput->tagData) + bInput->tagData->dataLen) : 0); - if (NULL == *msg || msgSize < estimateSize) { - tfree(*msg); - *msg = calloc(1, estimateSize); - if (NULL == *msg) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - } - - STableInfoMsg *bMsg = (STableInfoMsg *)*msg; - - bMsg->msgHead.vgId = bInput->vgId; - - strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); - bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; - - int32_t autoCreate = (bInput->tagData && bInput->tagData->dataLen > 0); - - bMsg->createFlag = htons(autoCreate ? 1 : 0); - - char *pMsg = NULL; - - // tag data exists - if (autoCreate) { - pMsg = msgSerializeTagData(bInput->tagData, (char *)bMsg->tags); - } - - *msgLen = (int32_t)(pMsg - (char*)bMsg); - - return TSDB_CODE_SUCCESS; -} - diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index a36c56ea3d..0f8281a3a6 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -72,7 +72,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_VGROUP_LIST] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_STREAM] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 82d7d9c571..720f197782 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -21,13 +21,18 @@ extern "C" { #endif #include "catalog.h" +#include "common.h" +#include "tlog.h" -#define CTG_DEFAULT_CLUSTER_NUMBER 3 +#define CTG_DEFAULT_CLUSTER_NUMBER 6 +#define CTG_DEFAULT_VGROUP_NUMBER 100 + +#define CTG_DEFAULT_INVALID_VERSION (-1) typedef struct SVgroupListCache { - int32_t vgroupNum; int32_t vgroupVersion; - SHashObj *cache; //key:vgId, value:SVgroupInfo + SHashObj *cache; // key:vgId, value:SVgroupInfo* + SArray *arrayCache; // SVgroupInfo } SVgroupListCache; typedef struct SDBVgroupCache { @@ -50,13 +55,16 @@ typedef struct SCatalogMgmt { } SCatalogMgmt; +extern int32_t ctgDebugFlag; + +#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgInfo(...) do { if (ctgDebugFlag & DEBUG_INFO) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgDebug(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) -#define ctgFatal(...) tscFatal(__VA_ARGS__) -#define ctgError(...) tscError(__VA_ARGS__) -#define ctgWarn(...) tscWarn(__VA_ARGS__) -#define ctgInfo(...) tscInfo(__VA_ARGS__) -#define ctgDebug(...) tscDebug(__VA_ARGS__) -#define ctgTrace(...) tscTrace(__VA_ARGS__) #define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) #define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); return _code; } } while (0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index e8b79bae4b..3837e5408a 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -14,53 +14,248 @@ */ #include "catalogInt.h" +#include "trpc.h" +#include "tmessage.h" SCatalogMgmt ctgMgmt = {0}; +int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SVgroupListInfo** pVgroup) { + char *msg = NULL; + SEpSet *pVnodeEpSet = NULL; + int32_t msgLen = 0; -int32_t catalogInit(SCatalog *cfg) { + int32_t code = tscBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST](NULL, &msg, 0, &msgLen); + if (code) { + return code; + } + + SRpcMsg rpcMsg = { + .msgType = TSDB_MSG_TYPE_VGROUP_LIST, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + + code = tscProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST](pVgroup, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + return code; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* exist) { + if (NULL == pCatalog->vgroupCache.arrayCache || pCatalog->vgroupCache.vgroupVersion < 0) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } + + if (pVgroupList) { + *pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache); + } + + *exist = 1; + + return TSDB_CODE_SUCCESS; +} + + + + + +int32_t catalogInit(SCatalogCfg *cfg) { ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == ctgMgmt.pCluster) { - CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_EROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); + CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); } return TSDB_CODE_SUCCESS; } -struct SCatalog* catalogGetHandle(const char *clusterId) { - if (NULL == clusterId) { - return NULL; +int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) { + if (NULL == clusterId || NULL == catalogHandle) { + return TSDB_CODE_CTG_INVALID_INPUT; } if (NULL == ctgMgmt.pCluster) { ctgError("cluster cache are not ready"); - return NULL; + return TSDB_CODE_CTG_NOT_READY; } size_t clen = strlen(clusterId); SCatalog *clusterCtg = (SCatalog *)taosHashGet(ctgMgmt.pCluster, clusterId, clen); if (clusterCtg) { - return clusterCtg; + *catalogHandle = clusterCtg; + return TSDB_CODE_SUCCESS; } clusterCtg = calloc(1, sizeof(*clusterCtg)); if (NULL == clusterCtg) { - ctgError("calloc %d failed", sizeof(*clusterCtg)); - return NULL; + ctgError("calloc %d failed", (int32_t)sizeof(*clusterCtg)); + return TSDB_CODE_CTG_MEM_ERROR; } + clusterCtg->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; + if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) { ctgError("put cluster %s cache to hash failed", clusterId); tfree(clusterCtg); - return NULL; + return TSDB_CODE_CTG_INTERNAL_ERROR; } + + *catalogHandle = clusterCtg; - return clusterCtg; + return TSDB_CODE_SUCCESS; } -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) { + +int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) { + if (NULL == pCatalog || NULL == version) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + *version = pCatalog->vgroupCache.vgroupVersion; + + return TSDB_CODE_SUCCESS; +} + + + +int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) { + if (NULL == pVgroup) { + ctgError("vgroup get from mnode succeed, but no output"); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + if (pVgroup->vgroupVersion < 0) { + ctgError("vgroup version[%d] is invalid", pVgroup->vgroupVersion); + return TSDB_CODE_CTG_INVALID_INPUT; + } + + + if (NULL == pCatalog->vgroupCache.arrayCache) { + pCatalog->vgroupCache.arrayCache = taosArrayInit(pVgroup->vgroupNum, sizeof(pVgroup->vgroupInfo[0])); + if (NULL == pCatalog->vgroupCache.arrayCache) { + ctgError("init array[%d] for cluster cache failed", pVgroup->vgroupNum); + return TSDB_CODE_CTG_MEM_ERROR; + } + } else { + taosArrayClear(pCatalog->vgroupCache.arrayCache); + } + + if (NULL == pCatalog->vgroupCache.cache) { + pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->vgroupCache.cache) { + ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_VGROUP_NUMBER); + return TSDB_CODE_CTG_MEM_ERROR; + } + } else { + taosHashClear(pCatalog->vgroupCache.cache); + } + + SVgroupInfo *vInfo = NULL; + for (int32_t i = 0; i < pVgroup->vgroupNum; ++i) { + vInfo = taosArrayPush(pCatalog->vgroupCache.arrayCache, &pVgroup->vgroupInfo[i]); + if (NULL == vInfo) { + ctgError("push to vgroup array cache failed"); + goto error_exit; + } + + if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &vInfo, POINTER_BYTES) != 0) { + ctgError("push to vgroup hash cache failed"); + goto error_exit; + } + } + + pCatalog->vgroupCache.vgroupVersion = pVgroup->vgroupVersion; + + return TSDB_CODE_SUCCESS; + +error_exit: + if (pCatalog->vgroupCache.arrayCache) { + taosArrayDestroy(pCatalog->vgroupCache.arrayCache); + pCatalog->vgroupCache.arrayCache = NULL; + } + + if (pCatalog->vgroupCache.cache) { + taosHashCleanup(pCatalog->vgroupCache.cache); + pCatalog->vgroupCache.cache = NULL; + } + + pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; + + return TSDB_CODE_CTG_INTERNAL_ERROR; +} + + +int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList) { + if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + int32_t exist = 0; + + CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist)); + + if (exist) { + return TSDB_CODE_SUCCESS; + } + + SVgroupListInfo *pVgroup = NULL; + + CTG_ERR_RET(ctgGetVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &pVgroup)); + + CTG_ERR_RET(catalogUpdateVgroup(pCatalog, pVgroup)); + + if (pVgroupList) { + CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist)); + } + + if (0 == exist) { + ctgError("catalog fetched but get from cache failed"); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { + if (NULL == pCatalog || NULL == dbName || NULL == version) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + if (NULL == pCatalog->dbCache.cache) { + *version = CTG_DEFAULT_INVALID_VERSION; + return TSDB_CODE_SUCCESS; + } + + SDBVgroupInfo * dbInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + if (NULL == dbInfo) { + *version = CTG_DEFAULT_INVALID_VERSION; + return TSDB_CODE_SUCCESS; + } + + *version = dbInfo->vgroupVersion; + + return TSDB_CODE_SUCCESS; +} + +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { + +} + +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { + +} + + + +int32_t catalogGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) { if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -79,12 +274,13 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const S .msgType = TSDB_MSG_TYPE_TABLE_META, .pCont = msg, .contLen = msgLen, - .ahandle = (void*)pSql->self, - .handle = NULL, - .code = 0 }; - rpcSendRequest(pRpcObj->pDnodeConn, pVnodeEpSet, &rpcMsg, &pSql->rpcRid); + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 758b83d820..ce47e50c1e 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -4081,7 +4081,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer SCatalogRsp data = {0}; // TODO: check if the qnode info has been cached already - req.qNodeEpset = true; + req.qNodeRequired = true; code = qParserExtractRequestedMetaInfo(pInfo, &req, msgBuf, msgBufLen); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 3faa06720b..548ab1b8df 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -42,7 +42,12 @@ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo** return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } - struct SCatalog* pCatalog = catalogGetHandle(NULL); + struct SCatalog* pCatalog = NULL; + int32_t code = catalogGetHandle(NULL, &pCatalog); + if (code) { + return code; + } + return qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg, msgLen); } diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 911e8472ab..71e08ea76f 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -406,7 +406,7 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t // for TDengine, all the query, show commands shall have TCP connection char type = pMsg->msgType; if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_SHOW_RETRIEVE - || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_STABLE_VGROUP + || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_VGROUP_LIST || type == TSDB_MSG_TYPE_TABLES_META || type == TSDB_MSG_TYPE_TABLE_META || type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_STATUS || type == TSDB_MSG_TYPE_ALTER_TABLE) pContext->connType = RPC_CONN_TCPC; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a75ce747b2..db94d26518 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -498,7 +498,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level") TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk") // catalog -TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_EROR, "catalog interval error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog interval error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index e6cc3a53af..23b66af0c8 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -99,6 +99,8 @@ int32_t wDebugFlag = 135; int32_t tsdbDebugFlag = 131; int32_t cqDebugFlag = 131; int32_t fsDebugFlag = 135; +int32_t ctgDebugFlag = 131; + int64_t dbgEmptyW = 0; int64_t dbgWN = 0;