Merge remote-tracking branch 'origin/3.0' into feature/3.0_wxy
This commit is contained in:
commit
e6d2e2e4d1
|
@ -219,6 +219,13 @@ typedef struct SBuildTableMetaInput {
|
||||||
char *tableFullName;
|
char *tableFullName;
|
||||||
} SBuildTableMetaInput;
|
} SBuildTableMetaInput;
|
||||||
|
|
||||||
|
typedef struct SBuildUseDBInput {
|
||||||
|
char db[TSDB_TABLE_FNAME_LEN];
|
||||||
|
int32_t vgroupVersion;
|
||||||
|
int32_t dbGroupVersion;
|
||||||
|
} SBuildUseDBInput;
|
||||||
|
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
|
|
||||||
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
|
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
|
||||||
|
@ -617,9 +624,12 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_TABLE_FNAME_LEN];
|
char db[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t ignoreNotExists;
|
int8_t ignoreNotExists;
|
||||||
|
int32_t vgroupVersion;
|
||||||
|
int32_t dbGroupVersion;
|
||||||
int32_t reserve[8];
|
int32_t reserve[8];
|
||||||
} SUseDbMsg;
|
} SUseDbMsg;
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_TABLE_FNAME_LEN];
|
char db[TSDB_TABLE_FNAME_LEN];
|
||||||
int32_t reserve[8];
|
int32_t reserve[8];
|
||||||
|
@ -806,8 +816,6 @@ typedef struct SVgroupListRspMsg {
|
||||||
SVgroupInfo vgroupInfo[];
|
SVgroupInfo vgroupInfo[];
|
||||||
} SVgroupListRspMsg;
|
} SVgroupListRspMsg;
|
||||||
|
|
||||||
typedef SVgroupListRspMsg SVgroupListInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int8_t numOfEps;
|
int8_t numOfEps;
|
||||||
|
@ -852,6 +860,19 @@ typedef struct {
|
||||||
char *data;
|
char *data;
|
||||||
} STagData;
|
} STagData;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t vgroupNum;
|
||||||
|
int32_t vgroupVersion;
|
||||||
|
char db[TSDB_TABLE_FNAME_LEN];
|
||||||
|
int32_t dbVgroupVersion;
|
||||||
|
int32_t dbVgroupNum;
|
||||||
|
int32_t dbHashRange;
|
||||||
|
SVgroupInfo vgroupInfo[];
|
||||||
|
//int32_t vgIdList[];
|
||||||
|
} SUseDbRspMsg;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* sql: show tables like '%a_%'
|
* sql: show tables like '%a_%'
|
||||||
* payload is the query condition, e.g., '%a_%'
|
* payload is the query condition, e.g., '%a_%'
|
||||||
|
|
|
@ -27,16 +27,10 @@ extern "C" {
|
||||||
#include "transport.h"
|
#include "transport.h"
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
|
#include "query.h"
|
||||||
|
|
||||||
struct SCatalog;
|
struct SCatalog;
|
||||||
|
|
||||||
typedef struct SDBVgroupInfo {
|
|
||||||
int32_t vgroupVersion;
|
|
||||||
SArray *vgId;
|
|
||||||
int32_t hashRange;
|
|
||||||
int32_t hashNum;
|
|
||||||
} SDBVgroupInfo;
|
|
||||||
|
|
||||||
typedef struct SCatalogReq {
|
typedef struct SCatalogReq {
|
||||||
char clusterId[TSDB_CLUSTER_ID_LEN]; //????
|
char clusterId[TSDB_CLUSTER_ID_LEN]; //????
|
||||||
SArray *pTableName; // table full name
|
SArray *pTableName; // table full name
|
||||||
|
|
|
@ -20,6 +20,22 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "tarray.h"
|
||||||
|
|
||||||
|
typedef SVgroupListRspMsg SVgroupListInfo;
|
||||||
|
|
||||||
|
typedef struct SDBVgroupInfo {
|
||||||
|
int32_t vgroupVersion;
|
||||||
|
SArray *vgId;
|
||||||
|
int32_t hashRange;
|
||||||
|
} SDBVgroupInfo;
|
||||||
|
|
||||||
|
typedef struct SUseDbOutput {
|
||||||
|
SVgroupListInfo *vgroupList;
|
||||||
|
char db[TSDB_TABLE_FNAME_LEN];
|
||||||
|
SDBVgroupInfo *dbVgroup;
|
||||||
|
} SUseDbOutput;
|
||||||
|
|
||||||
|
|
||||||
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
|
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
|
||||||
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
|
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
|
||||||
|
|
|
@ -23,8 +23,8 @@ extern "C" {
|
||||||
#define tfree(x) \
|
#define tfree(x) \
|
||||||
do { \
|
do { \
|
||||||
if (x) { \
|
if (x) { \
|
||||||
free((void *)x); \
|
free((void *)(x)); \
|
||||||
x = 0; \
|
(x) = 0; \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ extern "C" {
|
||||||
|
|
||||||
#define CTG_DEFAULT_CLUSTER_NUMBER 6
|
#define CTG_DEFAULT_CLUSTER_NUMBER 6
|
||||||
#define CTG_DEFAULT_VGROUP_NUMBER 100
|
#define CTG_DEFAULT_VGROUP_NUMBER 100
|
||||||
|
#define CTG_DEFAULT_DB_NUMBER 20
|
||||||
|
|
||||||
#define CTG_DEFAULT_INVALID_VERSION (-1)
|
#define CTG_DEFAULT_INVALID_VERSION (-1)
|
||||||
|
|
||||||
|
|
|
@ -63,21 +63,69 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t*
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) {
|
int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) {
|
||||||
/*
|
|
||||||
if (NULL == pCatalog->dbCache.cache) {
|
if (NULL == pCatalog->dbCache.cache) {
|
||||||
*exist = 0;
|
*exist = 0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashGet(SHashObj * pHashObj, const void * key, size_t keyLen)
|
SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||||
|
|
||||||
|
if (NULL == info || info->vgroupVersion < pCatalog->vgroupCache.vgroupVersion) {
|
||||||
|
*exist = 0;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
if (dbInfo) {
|
if (dbInfo) {
|
||||||
*pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache);
|
*dbInfo = calloc(1, sizeof(**dbInfo));
|
||||||
|
if (NULL == *dbInfo) {
|
||||||
|
ctgError("calloc size[%d] failed", (int32_t)sizeof(**dbInfo));
|
||||||
|
return TSDB_CODE_CTG_MEM_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*dbInfo)->vgId = taosArrayDup(info->vgId);
|
||||||
|
if (NULL == (*dbInfo)->vgId) {
|
||||||
|
ctgError("taos array duplicate failed");
|
||||||
|
tfree(*dbInfo);
|
||||||
|
return TSDB_CODE_CTG_MEM_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*dbInfo)->vgroupVersion = info->vgroupVersion;
|
||||||
|
(*dbInfo)->hashRange = info->hashRange;
|
||||||
}
|
}
|
||||||
|
|
||||||
*exist = 1;
|
*exist = 1;
|
||||||
*/
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
|
||||||
|
char *msg = NULL;
|
||||||
|
SEpSet *pVnodeEpSet = NULL;
|
||||||
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
|
int32_t code = queryBuildMsg[TSDB_MSG_TYPE_USE_DB](input, &msg, 0, &msgLen);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.msgType = TSDB_MSG_TYPE_USE_DB,
|
||||||
|
.pCont = msg,
|
||||||
|
.contLen = msgLen,
|
||||||
|
};
|
||||||
|
|
||||||
|
SRpcMsg rpcRsp = {0};
|
||||||
|
|
||||||
|
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||||
|
|
||||||
|
code = queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,7 +192,7 @@ int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) {
|
||||||
|
|
||||||
int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) {
|
int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) {
|
||||||
if (NULL == pVgroup) {
|
if (NULL == pVgroup) {
|
||||||
ctgError("vgroup get from mnode succeed, but no output");
|
ctgError("no valid vgroup list info to update");
|
||||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,7 +310,33 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
||||||
|
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
|
||||||
|
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dbInfo->vgroupVersion < 0) {
|
||||||
|
if (pCatalog->dbCache.cache) {
|
||||||
|
taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||||
|
}
|
||||||
|
|
||||||
|
ctgWarn("remove db [%s] from cache", dbName);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pCatalog->dbCache.cache) {
|
||||||
|
pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
|
if (NULL == pCatalog->dbCache.cache) {
|
||||||
|
ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_DB_NUMBER);
|
||||||
|
return TSDB_CODE_CTG_MEM_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
|
||||||
|
ctgError("push to vgroup hash cache failed");
|
||||||
|
return TSDB_CODE_CTG_MEM_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -273,8 +347,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
|
||||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
int32_t exist = 0;
|
int32_t exist = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
if (0 == forceUpdate) {
|
if (0 == forceUpdate) {
|
||||||
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
|
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
|
||||||
|
@ -284,18 +358,34 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SDBVgroupInfo* newDbInfo = NULL;
|
SUseDbOutput DbOut = {0};
|
||||||
|
SBuildUseDBInput input = {0};
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, dbName, &newDbInfo));
|
strncpy(input.db, dbName, sizeof(input.db));
|
||||||
|
input.db[sizeof(input.db) - 1] = 0;
|
||||||
|
input.vgroupVersion = pCatalog->vgroupCache.vgroupVersion;
|
||||||
|
input.dbGroupVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||||
|
|
||||||
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, newDbInfo));
|
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
|
||||||
|
|
||||||
|
if (DbOut.vgroupList) {
|
||||||
|
CTG_ERR_JRET(catalogUpdateVgroup(pCatalog, DbOut.vgroupList));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (DbOut.dbVgroup) {
|
||||||
|
CTG_ERR_JRET(catalogUpdateDBVgroup(pCatalog, dbName, DbOut.dbVgroup));
|
||||||
|
}
|
||||||
|
|
||||||
if (dbInfo) {
|
if (dbInfo) {
|
||||||
*dbInfo = newDbInfo;
|
*dbInfo = DbOut.dbVgroup;
|
||||||
|
DbOut.dbVgroup = NULL;
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
_return:
|
||||||
|
tfree(DbOut.dbVgroup);
|
||||||
|
tfree(DbOut.vgroupList);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ target_include_directories(
|
||||||
|
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
parser
|
parser
|
||||||
PRIVATE os util common catalog function transport
|
PRIVATE os util common catalog function transport query
|
||||||
)
|
)
|
||||||
|
|
||||||
ADD_SUBDIRECTORY(test)
|
ADD_SUBDIRECTORY(test)
|
|
@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
ADD_EXECUTABLE(parserTest ${SOURCE_LIST})
|
ADD_EXECUTABLE(parserTest ${SOURCE_LIST})
|
||||||
TARGET_LINK_LIBRARIES(
|
TARGET_LINK_LIBRARIES(
|
||||||
parserTest
|
parserTest
|
||||||
PUBLIC os util common parser catalog transport gtest function planner
|
PUBLIC os util common parser catalog transport gtest function planner query
|
||||||
)
|
)
|
||||||
|
|
||||||
TARGET_INCLUDE_DIRECTORIES(
|
TARGET_INCLUDE_DIRECTORIES(
|
||||||
|
|
|
@ -8,7 +8,7 @@ target_include_directories(
|
||||||
|
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
planner
|
planner
|
||||||
PRIVATE os util common catalog parser transport function
|
PRIVATE os util common catalog parser transport function query
|
||||||
)
|
)
|
||||||
|
|
||||||
ADD_SUBDIRECTORY(test)
|
ADD_SUBDIRECTORY(test)
|
|
@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
ADD_EXECUTABLE(plannerTest ${SOURCE_LIST})
|
ADD_EXECUTABLE(plannerTest ${SOURCE_LIST})
|
||||||
TARGET_LINK_LIBRARIES(
|
TARGET_LINK_LIBRARIES(
|
||||||
plannerTest
|
plannerTest
|
||||||
PUBLIC os util common planner parser catalog transport gtest function
|
PUBLIC os util common planner parser catalog transport gtest function query
|
||||||
)
|
)
|
||||||
|
|
||||||
TARGET_INCLUDE_DIRECTORIES(
|
TARGET_INCLUDE_DIRECTORIES(
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "queryInt.h"
|
#include "queryInt.h"
|
||||||
|
#include "query.h"
|
||||||
|
|
||||||
int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
|
int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
|
||||||
|
|
||||||
|
@ -60,6 +60,36 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||||
|
if (NULL == input || NULL == msg || NULL == msgLen) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
SBuildUseDBInput* bInput = (SBuildUseDBInput *)input;
|
||||||
|
|
||||||
|
int32_t estimateSize = sizeof(SUseDbMsg);
|
||||||
|
if (NULL == *msg || msgSize < estimateSize) {
|
||||||
|
tfree(*msg);
|
||||||
|
*msg = calloc(1, estimateSize);
|
||||||
|
if (NULL == *msg) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SUseDbMsg *bMsg = (SUseDbMsg *)*msg;
|
||||||
|
|
||||||
|
strncpy(bMsg->db, bInput->db, sizeof(bMsg->db));
|
||||||
|
bMsg->db[sizeof(bMsg->db) - 1] = 0;
|
||||||
|
|
||||||
|
bMsg->vgroupVersion = bInput->vgroupVersion;
|
||||||
|
bMsg->dbGroupVersion = bInput->dbGroupVersion;
|
||||||
|
|
||||||
|
*msgLen = (int32_t)sizeof(*bMsg);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
|
int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
|
||||||
if (NULL == output || NULL == msg || msgSize <= 0) {
|
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||||
|
@ -103,12 +133,126 @@ int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
||||||
|
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
SUseDbRspMsg *pRsp = (SUseDbRspMsg *)msg;
|
||||||
|
SUseDbOutput *pOut = (SUseDbOutput *)output;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (msgSize <= sizeof(*pRsp)) {
|
||||||
|
qError("invalid use db rsp msg size, msgSize:%d", msgSize);
|
||||||
|
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRsp->vgroupVersion = htonl(pRsp->vgroupVersion);
|
||||||
|
pRsp->dbVgroupVersion = htonl(pRsp->dbVgroupVersion);
|
||||||
|
|
||||||
|
pRsp->vgroupNum = htonl(pRsp->vgroupNum);
|
||||||
|
pRsp->dbVgroupNum = htonl(pRsp->dbVgroupNum);
|
||||||
|
|
||||||
|
if (pRsp->vgroupNum < 0) {
|
||||||
|
qError("invalid vgroup number[%d]", pRsp->vgroupNum);
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRsp->dbVgroupNum < 0) {
|
||||||
|
qError("invalid db vgroup number[%d]", pRsp->dbVgroupNum);
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t expectSize = pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + pRsp->dbVgroupNum * sizeof(int32_t) + sizeof(*pRsp);
|
||||||
|
if (msgSize != expectSize) {
|
||||||
|
qError("vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d", msgSize, expectSize, pRsp->vgroupNum, pRsp->dbVgroupNum);
|
||||||
|
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRsp->vgroupVersion < 0) {
|
||||||
|
qInfo("no new vgroup list info");
|
||||||
|
if (pRsp->vgroupNum != 0) {
|
||||||
|
qError("invalid vgroup number[%d] for no new vgroup list case", pRsp->vgroupNum);
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int32_t s = sizeof(*pOut->vgroupList) + sizeof(pOut->vgroupList->vgroupInfo[0]) * pRsp->vgroupNum;
|
||||||
|
pOut->vgroupList = calloc(1, s);
|
||||||
|
if (NULL == pOut->vgroupList) {
|
||||||
|
qError("calloc size[%d] failed", s);
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOut->vgroupList->vgroupNum = pRsp->vgroupNum;
|
||||||
|
pOut->vgroupList->vgroupVersion = pRsp->vgroupVersion;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pRsp->vgroupNum; ++i) {
|
||||||
|
pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId);
|
||||||
|
for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) {
|
||||||
|
pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(&pOut->vgroupList->vgroupInfo[i], &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t *vgIdList = (int32_t *)((char *)pRsp->vgroupInfo + sizeof(pRsp->vgroupInfo[0]) * pRsp->vgroupNum);
|
||||||
|
|
||||||
|
memcpy(pOut->db, pRsp->db, sizeof(pOut->db));
|
||||||
|
|
||||||
|
if (pRsp->dbVgroupVersion < 0) {
|
||||||
|
qInfo("no new vgroup info for db[%s]", pRsp->db);
|
||||||
|
} else {
|
||||||
|
pOut->dbVgroup = calloc(1, sizeof(*pOut->dbVgroup));
|
||||||
|
if (NULL == pOut->dbVgroup) {
|
||||||
|
qError("calloc size[%d] failed", (int32_t)sizeof(*pOut->dbVgroup));
|
||||||
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOut->dbVgroup->vgId = taosArrayInit(pRsp->dbVgroupNum, sizeof(int32_t));
|
||||||
|
if (NULL == pOut->dbVgroup->vgId) {
|
||||||
|
qError("taosArrayInit size[%d] failed", pRsp->dbVgroupNum);
|
||||||
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOut->dbVgroup->vgroupVersion = pRsp->dbVgroupVersion;
|
||||||
|
pOut->dbVgroup->hashRange = htonl(pRsp->dbHashRange);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pRsp->dbVgroupNum; ++i) {
|
||||||
|
*(vgIdList + i) = htonl(*(vgIdList + i));
|
||||||
|
|
||||||
|
taosArrayPush(pOut->dbVgroup->vgId, vgIdList + i) ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (pOut->dbVgroup && pOut->dbVgroup->vgId) {
|
||||||
|
taosArrayDestroy(pOut->dbVgroup->vgId);
|
||||||
|
pOut->dbVgroup->vgId = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tfree(pOut->dbVgroup);
|
||||||
|
tfree(pOut->vgroupList);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void msgInit() {
|
void msgInit() {
|
||||||
queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg;
|
queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg;
|
||||||
queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg;
|
queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg;
|
||||||
|
queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg;
|
||||||
|
|
||||||
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
|
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
|
||||||
queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp;
|
queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp;
|
||||||
|
queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
|
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
|
||||||
|
|
Loading…
Reference in New Issue