diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h
index 1f2452291b..449064c8c6 100644
--- a/include/libs/catalog/catalog.h
+++ b/include/libs/catalog/catalog.h
@@ -77,7 +77,14 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
* @pVgroupList - array of SVgroupInfo
* @return
*/
-int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList);
+int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList);
+
+/**
+ * get a table's dst vgroup from its name's hash value.
+ * @vgInfo - SVgroupInfo
+ * @return
+ */
+int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo);
/**
diff --git a/include/libs/query/query.h b/include/libs/query/query.h
index 8720fd085c..060aef9d65 100644
--- a/include/libs/query/query.h
+++ b/include/libs/query/query.h
@@ -85,6 +85,8 @@ typedef struct STableMetaOutput {
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
+extern void msgInit();
+
#ifdef __cplusplus
}
diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c
index 9fdac36060..3aac02c1e4 100644
--- a/source/libs/catalog/src/catalog.c
+++ b/source/libs/catalog/src/catalog.c
@@ -63,6 +63,10 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
SRpcMsg rpcRsp = {0};
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
+ if (TSDB_CODE_SUCCESS != rpcRsp.code) {
+ ctgError("error rsp for use db, code:%x", rpcRsp.code);
+ return rpcRsp.code;
+ }
code = queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen);
if (code) {
@@ -169,9 +173,9 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
ctgGenEpSet(&epSet, vgroupInfo);
rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
-
+
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
- ctgError("get table meta from mnode failed, error code:%d", rpcRsp.code);
+ ctgError("error rsp for table meta, code:%x", rpcRsp.code);
return rpcRsp.code;
}
@@ -254,24 +258,6 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
}
-int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
- SDBVgroupInfo dbInfo = {0};
- int32_t code = 0;
- int32_t vgId = 0;
-
- CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo));
-
- if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
- ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
- return TSDB_CODE_TSC_DB_NOT_SELECTED;
- }
-
- CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));
-
- return code;
-}
-
-
STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) {
assert(pChild != NULL);
@@ -554,7 +540,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
SVgroupInfo vgroupInfo = {0};
- CTG_ERR_RET(ctgGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
+ CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
STableMetaOutput output = {0};
@@ -571,7 +557,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta);
}
-int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) {
+int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
@@ -607,6 +593,24 @@ _return:
}
+int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
+ SDBVgroupInfo dbInfo = {0};
+ int32_t code = 0;
+ int32_t vgId = 0;
+
+ CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo));
+
+ if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
+ ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
+ return TSDB_CODE_TSC_DB_NOT_SELECTED;
+ }
+
+ CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));
+
+ return code;
+}
+
+
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
return TSDB_CODE_CTG_INVALID_INPUT;
diff --git a/source/libs/catalog/test/CMakeLists.txt b/source/libs/catalog/test/CMakeLists.txt
index 527156f176..176978cc7f 100644
--- a/source/libs/catalog/test/CMakeLists.txt
+++ b/source/libs/catalog/test/CMakeLists.txt
@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(catalogTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
catalogTest
- PUBLIC os util common catalog transport gtest query
+ PUBLIC os util common catalog transport gtest query taos
)
TARGET_INCLUDE_DIRECTORIES(
diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp
index f495451091..0493ddfe8c 100644
--- a/source/libs/catalog/test/catalogTests.cpp
+++ b/source/libs/catalog/test/catalogTests.cpp
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include
+#include
#include
#include
#pragma GCC diagnostic ignored "-Wwrite-strings"
@@ -23,130 +23,65 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
-#include "taos.h"
+#include "taos.h"
#include "tdef.h"
-#include "tvariant.h"
-#include "catalog.h"
-
+#include "tvariant.h"
+#include "catalog.h"
+#include "tep.h"
+
+typedef struct SAppInstInfo {
+ int64_t numOfConns;
+ SCorEpSet mgmtEp;
+} SAppInstInfo;
+
+typedef struct STscObj {
+ char user[TSDB_USER_LEN];
+ char pass[TSDB_PASSWORD_LEN];
+ char acctId[TSDB_ACCT_ID_LEN];
+ char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
+ uint32_t connId;
+ uint64_t id; // ref ID returned by taosAddRef
+// struct SSqlObj *sqlList;
+ void *pTransporter;
+ pthread_mutex_t mutex; // used to protect the operation on db
+ int32_t numOfReqs; // number of sqlObj from this tscObj
+ SAppInstInfo *pAppInfo;
+} STscObj;
+
namespace {
-
-
+
}
-
-TEST(testCase, normalCase) {
- char *clusterId = "cluster1";
- struct SCatalog* pCtg = NULL;
-
- int32_t code = catalogInit(NULL);
- ASSERT_EQ(code, 0);
-
- code = catalogGetHandle(clusterId, &pCtg);
- ASSERT_EQ(code, 0);
-
-
-}
-
-/*
-TEST(testCase, normalCase) {
- SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
- ASSERT_EQ(info1.valid, true);
- char msg[128] = {0};
- SMsgBuf buf;
- buf.len = 128;
- buf.buf = msg;
+TEST(testCase, normalCase) {
+ STscObj* pConn = (STscObj *)taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
+ assert(pConn != NULL);
- SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0);
- int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
+ char *clusterId = "cluster1";
+ char *dbname = "db1";
+ char *tablename = "table1";
+ struct SCatalog* pCtg = NULL;
+ void *mockPointer = (void *)0x1;
+ SVgroupInfo vgInfo = {0};
+
+ msgInit();
+
+ int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
- SCatalogReq req = {0};
- int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
- ASSERT_EQ(ret, 0);
- ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
+ code = catalogGetHandle(clusterId, &pCtg);
+ ASSERT_EQ(code, 0);
- SQueryStmtInfo* pQueryInfo = createQueryInfo();
- setTableMetaInfo(pQueryInfo, &req);
+ code = catalogGetTableHashVgroup(pCtg, pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet, dbname, tablename, &vgInfo);
+ ASSERT_EQ(code, 0);
- SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0);
- ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
- ASSERT_EQ(ret, 0);
-
- SArray* pExprList = pQueryInfo->exprList[0];
-
- int32_t num = tsCompatibleModel? 2:1;
- ASSERT_EQ(taosArrayGetSize(pExprList), num);
-
- SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
- ASSERT_EQ(p1->base.pColumns->uid, 110);
- ASSERT_EQ(p1->base.numOfParams, 1);
- ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
- ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
- ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP);
- ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
- ASSERT_EQ(p1->base.interBytes, 16);
-
- ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
- ASSERT_STREQ(p1->pExpr->_function.functionName, "top");
-
- tExprNode* pParam = p1->pExpr->_function.pChild[0];
-
- ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE);
- ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
- ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
-
- struct SQueryPlanNode* n = nullptr;
- code = createQueryPlan(pQueryInfo, &n);
-
- char* str = NULL;
- queryPlanToString(n, &str);
- printf("%s\n", str);
-
- destroyQueryInfo(pQueryInfo);
- qParserClearupMetaRequestInfo(&req);
- destroySqlInfo(&info1);
+ taos_close(pConn);
}
-TEST(testCase, displayPlan) {
- generateLogicplan("select count(*) from `t.1abc`");
- generateLogicplan("select count(*)+ 22 from `t.1abc`");
- generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30");
- generateLogicplan("select count(*) from `t.1abc` group by a");
- generateLogicplan("select count(A+B) from `t.1abc` group by a");
- generateLogicplan("select count(length(a)+b) from `t.1abc` group by a");
- generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)");
- generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc ");
- generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`");
- generateLogicplan("select count(*), min(a) + 99 from `t.1abc`");
- generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`");
- generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20");
- generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)");
- generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)");
- // order by + group by column + limit offset
- generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1");
-
- // fill
- generateLogicplan("select min(a) from `t.1abc` where ts>now and ts