catalog init version
This commit is contained in:
parent
a9f9194a31
commit
d962a2715f
|
@ -30,19 +30,19 @@ extern "C" {
|
|||
|
||||
struct SCatalog;
|
||||
|
||||
typedef struct SMetaReq {
|
||||
typedef struct SCatalogReq {
|
||||
char clusterId[TSDB_CLUSTER_ID_LEN];
|
||||
SArray *pTableName; // table full name
|
||||
SArray *pUdf; // udf name
|
||||
bool qNodeEpset; // valid qnode
|
||||
} SMetaReq;
|
||||
} SCatalogReq;
|
||||
|
||||
typedef struct SMetaData {
|
||||
typedef struct SCatalogRsp {
|
||||
SArray *pTableMeta; // tableMeta
|
||||
SArray *pVgroupInfo; // vgroupInfo list
|
||||
SArray *pUdfList; // udf info list
|
||||
SEpSet *pEpSet; // qnode epset list
|
||||
} SMetaData;
|
||||
} SCatalogRsp;
|
||||
|
||||
typedef struct STableComInfo {
|
||||
uint8_t numOfTags; // the number of tags in schema
|
||||
|
@ -78,32 +78,45 @@ typedef struct STableMeta {
|
|||
SSchema schema[];
|
||||
} STableMeta;
|
||||
|
||||
typedef struct SCatalogCfg {
|
||||
|
||||
} SCatalogCfg;
|
||||
|
||||
|
||||
int32_t catalogInit(SCatalog *cfg);
|
||||
|
||||
/**
|
||||
* Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side.
|
||||
* There is ONLY one SCatalog object for one process space, and this function returns a singleton.
|
||||
* @param pMgmtEps
|
||||
* @param clusterId
|
||||
* @return
|
||||
*/
|
||||
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps);
|
||||
struct SCatalog* catalogGetHandle(const char *clusterId);
|
||||
|
||||
/**
|
||||
* Get the required meta data from mnode.
|
||||
* Note that this is a synchronized API and is also thread-safety.
|
||||
* @param pCatalog
|
||||
* @param pMgmtEps
|
||||
* @param pMetaReq
|
||||
* @param pMetaData
|
||||
* @return
|
||||
*/
|
||||
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData);
|
||||
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pCatalogReq, SCatalogRsp* pCatalogData);
|
||||
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta);
|
||||
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, SCatalogRsp* pCatalogData);
|
||||
|
||||
|
||||
/**
|
||||
* Destroy catalog service handle
|
||||
* Destroy catalog and relase all resources
|
||||
* @param pCatalog
|
||||
*/
|
||||
void destroyCatalog(struct SCatalog* pCatalog);
|
||||
void catalogDestroy(void);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_CATALOG_H_*/
|
||||
#endif /*_TD_CATALOG_H_*/
|
||||
|
|
|
@ -23,10 +23,14 @@ extern "C" {
|
|||
#include "catalog.h"
|
||||
|
||||
typedef struct SCatalog {
|
||||
void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata
|
||||
SHashObj *pData; // items cached for each cluster, the hash key is the cluster-id, returned by mgmt node
|
||||
|
||||
} SCatalog;
|
||||
|
||||
typedef struct SCatalogMgmt {
|
||||
void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata
|
||||
SHashObj *pMeta; // items cached for each cluster, the hash key is the cluster-id, returned by mgmt node
|
||||
} SCatalogMgmt;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -15,10 +15,18 @@
|
|||
|
||||
#include "catalogInt.h"
|
||||
|
||||
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps) {
|
||||
SCatalogMgmt ctgMgmt = {0};
|
||||
|
||||
|
||||
int32_t catalogInit(SCatalog *cfg) {
|
||||
ctgMgmt = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
}
|
||||
|
||||
|
||||
struct SCatalog* catalogGetHandle(const char *clusterId) {
|
||||
return (struct SCatalog*) 0x1;
|
||||
}
|
||||
|
||||
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) {
|
||||
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pMetaReq, SCatalogRsp* pMetaData) {
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -87,13 +87,13 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf);
|
|||
* @param msgBufLen
|
||||
* @return
|
||||
*/
|
||||
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMetaInfo, char* msg, int32_t msgBufLen);
|
||||
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen);
|
||||
|
||||
/**
|
||||
* Destroy the meta data request structure.
|
||||
* @param pMetaInfo
|
||||
*/
|
||||
void qParserClearupMetaRequestInfo(SMetaReq* pMetaInfo);
|
||||
void qParserClearupMetaRequestInfo(SCatalogReq* pMetaInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -4077,8 +4077,8 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
|
|||
}
|
||||
#endif
|
||||
|
||||
SMetaReq req = {0};
|
||||
SMetaData data = {0};
|
||||
SCatalogReq req = {0};
|
||||
SCatalogRsp data = {0};
|
||||
|
||||
// TODO: check if the qnode info has been cached already
|
||||
req.qNodeEpset = true;
|
||||
|
@ -4088,7 +4088,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
|
|||
}
|
||||
|
||||
// load the meta data from catalog
|
||||
code = catalogGetMetaData(pCatalog, &req, &data);
|
||||
code = catalogGetAllMeta(pCatalog, NULL, &req, &data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo**
|
|||
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
||||
}
|
||||
|
||||
struct SCatalog* pCatalog = getCatalogHandle(NULL);
|
||||
struct SCatalog* pCatalog = catalogGetHandle(NULL);
|
||||
return qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg, msgLen);
|
||||
}
|
||||
|
||||
|
@ -131,7 +131,7 @@ static void freePtrElem(void* p) {
|
|||
tfree(*(char**)p);
|
||||
}
|
||||
|
||||
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMetaInfo, char* msg, int32_t msgBufLen) {
|
||||
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SMsgBuf msgBuf = {.buf = msg, .len = msgBufLen};
|
||||
|
||||
|
@ -188,7 +188,7 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMet
|
|||
return code;
|
||||
}
|
||||
|
||||
void qParserClearupMetaRequestInfo(SMetaReq* pMetaReq) {
|
||||
void qParserClearupMetaRequestInfo(SCatalogReq* pMetaReq) {
|
||||
if (pMetaReq == NULL) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_
|
|||
strcpy(p->name, name);
|
||||
}
|
||||
|
||||
void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq* req) {
|
||||
void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SCatalogReq* req) {
|
||||
pQueryInfo->numOfTables = 1;
|
||||
|
||||
pQueryInfo->pTableMetaInfo = (STableMetaInfo**)calloc(1, POINTER_BYTES);
|
||||
|
@ -80,7 +80,7 @@ void sqlCheck(const char* sql, bool valid) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -117,7 +117,7 @@ TEST(testCase, validateAST_test) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -175,7 +175,7 @@ TEST(testCase, function_Test) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -221,7 +221,7 @@ TEST(testCase, function_Test2) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -267,7 +267,7 @@ TEST(testCase, function_Test3) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -312,7 +312,7 @@ TEST(testCase, function_Test4) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -360,7 +360,7 @@ TEST(testCase, function_Test5) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -445,7 +445,7 @@ TEST(testCase, function_Test6) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -523,7 +523,7 @@ TEST(testCase, function_Test6) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -585,7 +585,7 @@ TEST(testCase, function_Test6) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -664,7 +664,7 @@ TEST(testCase, function_Test6) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
|
|
@ -39,7 +39,7 @@ void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_
|
|||
strcpy(p->name, name);
|
||||
}
|
||||
|
||||
void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
|
||||
void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SCatalogReq *req) {
|
||||
pQueryInfo->numOfTables = 1;
|
||||
|
||||
pQueryInfo->pTableMetaInfo = (STableMetaInfo**)calloc(1, POINTER_BYTES);
|
||||
|
@ -79,7 +79,7 @@ void generateLogicplan(const char* sql) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
@ -119,7 +119,7 @@ TEST(testCase, planner_test) {
|
|||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
|
|
@ -714,7 +714,7 @@ TEST(testCase, extractMeta_test) {
|
|||
ASSERT_EQ(info1.valid, true);
|
||||
|
||||
char msg[128] = {0};
|
||||
SMetaReq req = {0};
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
|
Loading…
Reference in New Issue