commit
39de3fe0a5
|
@ -221,8 +221,7 @@ typedef struct SBuildTableMetaInput {
|
|||
|
||||
typedef struct SBuildUseDBInput {
|
||||
char db[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t vgroupVersion;
|
||||
int32_t dbGroupVersion;
|
||||
int32_t vgVersion;
|
||||
} SBuildUseDBInput;
|
||||
|
||||
|
||||
|
@ -627,8 +626,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
char db[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t ignoreNotExists;
|
||||
int32_t vgroupVersion;
|
||||
int32_t dbGroupVersion;
|
||||
int32_t vgVersion;
|
||||
int32_t reserve[8];
|
||||
} SUseDbMsg;
|
||||
|
||||
|
@ -808,6 +806,9 @@ typedef struct SSTableVgroupMsg {
|
|||
|
||||
typedef struct SVgroupInfo {
|
||||
int32_t vgId;
|
||||
int32_t hashBegin;
|
||||
int32_t hashEnd;
|
||||
int8_t inUse;
|
||||
int8_t numOfEps;
|
||||
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
|
||||
} SVgroupInfo;
|
||||
|
@ -863,16 +864,12 @@ typedef struct {
|
|||
} STagData;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgroupNum;
|
||||
int32_t vgroupVersion;
|
||||
char db[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t dbVgroupVersion;
|
||||
int32_t dbVgroupNum;
|
||||
int32_t dbHashRange;
|
||||
int32_t dbHashType;
|
||||
char db[TSDB_FULL_DB_NAME_LEN];
|
||||
int32_t vgVersion;
|
||||
int32_t vgNum;
|
||||
int8_t hashMethod;
|
||||
SVgroupInfo vgroupInfo[];
|
||||
//int32_t vgIdList[];
|
||||
} SUseDbRspMsg;
|
||||
} SUseDbRsp;
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -61,22 +61,8 @@ int32_t catalogInit(SCatalogCfg *cfg);
|
|||
*/
|
||||
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
|
||||
|
||||
|
||||
|
||||
int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version);
|
||||
|
||||
/**
|
||||
* get cluster vgroup list.
|
||||
* @pVgroupList - hash of vgroup list, key:vgId, value:SVgroupInfo
|
||||
* @return
|
||||
*/
|
||||
int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash);
|
||||
int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup);
|
||||
|
||||
|
||||
|
||||
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
|
||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo);
|
||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo);
|
||||
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
||||
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#include "tarray.h"
|
||||
#include "thash.h"
|
||||
|
||||
typedef SVgroupListRspMsg SVgroupListInfo;
|
||||
|
||||
|
@ -63,16 +64,14 @@ typedef struct STableMeta {
|
|||
|
||||
|
||||
typedef struct SDBVgroupInfo {
|
||||
int32_t vgroupVersion;
|
||||
SArray *vgId;
|
||||
int32_t hashRange;
|
||||
int32_t hashType;
|
||||
int32_t vgVersion;
|
||||
int8_t hashMethod;
|
||||
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
|
||||
} SDBVgroupInfo;
|
||||
|
||||
typedef struct SUseDbOutput {
|
||||
SVgroupListInfo *vgroupList;
|
||||
char db[TSDB_TABLE_FNAME_LEN];
|
||||
SDBVgroupInfo *dbVgroup;
|
||||
char db[TSDB_FULL_DB_NAME_LEN];
|
||||
SDBVgroupInfo dbVgroup;
|
||||
} SUseDbOutput;
|
||||
|
||||
typedef struct STableMetaOutput {
|
||||
|
|
|
@ -10,3 +10,5 @@ target_link_libraries(
|
|||
catalog
|
||||
PRIVATE os util common transport query
|
||||
)
|
||||
|
||||
ADD_SUBDIRECTORY(test)
|
|
@ -20,50 +20,7 @@
|
|||
|
||||
SCatalogMgmt ctgMgmt = {0};
|
||||
|
||||
int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SVgroupListInfo** pVgroup) {
|
||||
char *msg = NULL;
|
||||
SEpSet *pVnodeEpSet = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
||||
int32_t code = queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST](NULL, &msg, 0, &msgLen);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TSDB_MSG_TYPE_VGROUP_LIST,
|
||||
.pCont = msg,
|
||||
.contLen = msgLen,
|
||||
};
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
|
||||
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||
|
||||
code = queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST](pVgroup, rpcRsp.pCont, rpcRsp.contLen);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetVgroupFromCache(struct SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) {
|
||||
if (NULL == pCatalog->vgroupCache.cache || pCatalog->vgroupCache.vgroupVersion < 0) {
|
||||
*exist = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pVgroupList) {
|
||||
*pVgroupList = pCatalog->vgroupCache.cache;
|
||||
}
|
||||
|
||||
*exist = 1;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) {
|
||||
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo *dbInfo, int32_t *exist) {
|
||||
if (NULL == pCatalog->dbCache.cache) {
|
||||
*exist = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -71,28 +28,13 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S
|
|||
|
||||
SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||
|
||||
if (NULL == info || info->vgroupVersion < pCatalog->vgroupCache.vgroupVersion) {
|
||||
if (NULL == info) {
|
||||
*exist = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (dbInfo) {
|
||||
*dbInfo = calloc(1, sizeof(**dbInfo));
|
||||
if (NULL == *dbInfo) {
|
||||
ctgError("calloc size[%d] failed", (int32_t)sizeof(**dbInfo));
|
||||
return TSDB_CODE_CTG_MEM_ERROR;
|
||||
}
|
||||
|
||||
(*dbInfo)->vgId = taosArrayDup(info->vgId);
|
||||
if (NULL == (*dbInfo)->vgId) {
|
||||
ctgError("taos array duplicate failed");
|
||||
tfree(*dbInfo);
|
||||
return TSDB_CODE_CTG_MEM_ERROR;
|
||||
}
|
||||
|
||||
(*dbInfo)->vgroupVersion = info->vgroupVersion;
|
||||
(*dbInfo)->hashRange = info->hashRange;
|
||||
(*dbInfo)->hashType = info->hashType;
|
||||
*dbInfo = *info;
|
||||
}
|
||||
|
||||
*exist = 1;
|
||||
|
@ -242,8 +184,8 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) {
|
||||
switch (hashType) {
|
||||
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
|
||||
switch (hashMethod) {
|
||||
default:
|
||||
*fp = MurmurHash3_32;
|
||||
break;
|
||||
|
@ -252,96 +194,79 @@ int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetVgroupFromVgId(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, int32_t vgId, SVgroupInfo *pVgroup) {
|
||||
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray* vgroupList) {
|
||||
SHashObj *vgroupHash = NULL;
|
||||
|
||||
CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash));
|
||||
if (NULL == vgroupHash) {
|
||||
ctgError("get empty vgroup cache");
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
SVgroupInfo *vgInfo = NULL;
|
||||
|
||||
if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) {
|
||||
ctgError("vgId[%d] not found in vgroup list", vgId);
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
|
||||
while (pIter) {
|
||||
vgInfo = pIter;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetVgroupFromVgIdBatch(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SArray* vgIds, SArray* vgroupList) {
|
||||
SHashObj *vgroupHash = NULL;
|
||||
SVgroupInfo pVgroup = {0};
|
||||
int32_t vgIdNum = taosArrayGetSize(vgIds);
|
||||
|
||||
CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash));
|
||||
if (NULL == vgroupHash) {
|
||||
ctgError("get empty vgroup cache");
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < vgIdNum; ++i) {
|
||||
int32_t *vgId = taosArrayGet(vgIds, i);
|
||||
if (NULL == taosArrayPush(vgroupList, vgInfo)) {
|
||||
ctgError("taosArrayPush failed");
|
||||
break;
|
||||
}
|
||||
|
||||
if (NULL == taosHashGetClone(vgroupHash, vgId, sizeof(*vgId), &pVgroup)) {
|
||||
ctgError("vgId[%d] not found in vgroup list", vgId);
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(vgroupList, &pVgroup)) {
|
||||
ctgError("push vgroup to array failed, idx:%d", i);
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
pIter = taosHashIterate(dbInfo->vgInfo, pIter);
|
||||
vgInfo = NULL;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
|
||||
SDBVgroupInfo *dbInfo = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo));
|
||||
|
||||
if (NULL == dbInfo) {
|
||||
ctgWarn("db[%s] vgroup info not found", pDBName);
|
||||
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
|
||||
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
|
||||
if (vgNum <= 0) {
|
||||
ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
|
||||
return TSDB_CODE_TSC_DB_NOT_SELECTED;
|
||||
}
|
||||
|
||||
if (dbInfo->vgroupVersion < 0 || NULL == dbInfo->vgId) {
|
||||
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgId:%p", pDBName, dbInfo->vgroupVersion, dbInfo->vgId);
|
||||
CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
||||
}
|
||||
|
||||
int32_t vgNum = taosArrayGetSize(dbInfo->vgId);
|
||||
if (vgNum <= 0) {
|
||||
ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
|
||||
CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
||||
}
|
||||
|
||||
tableNameHashFp fp = NULL;
|
||||
SVgroupInfo *vgInfo = NULL;
|
||||
|
||||
CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashType, &fp));
|
||||
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
|
||||
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
|
||||
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
|
||||
|
||||
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));
|
||||
uint32_t hashUnit = dbInfo->hashRange / vgNum;
|
||||
uint32_t vgId = hashValue / hashUnit;
|
||||
|
||||
CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, vgId, pVgroup));
|
||||
|
||||
_return:
|
||||
if (dbInfo && dbInfo->vgId) {
|
||||
taosArrayDestroy(dbInfo->vgId);
|
||||
dbInfo->vgId = NULL;
|
||||
void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
|
||||
while (pIter) {
|
||||
vgInfo = pIter;
|
||||
if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
|
||||
break;
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(dbInfo->vgInfo, pIter);
|
||||
vgInfo = NULL;
|
||||
}
|
||||
|
||||
tfree(dbInfo);
|
||||
|
||||
if (NULL == vgInfo) {
|
||||
ctgError("no hash range found for hashvalue[%u]", hashValue);
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
*pVgroup = *vgInfo;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
|
||||
SDBVgroupInfo dbInfo = {0};
|
||||
int32_t code = 0;
|
||||
int32_t vgId = 0;
|
||||
|
||||
CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo));
|
||||
|
||||
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
|
||||
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
|
||||
return TSDB_CODE_TSC_DB_NOT_SELECTED;
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -524,95 +449,6 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) {
|
||||
if (NULL == pCatalog || NULL == version) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
||||
*version = pCatalog->vgroupCache.vgroupVersion;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) {
|
||||
if (NULL == pVgroup) {
|
||||
ctgError("no valid vgroup list info to update");
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (pVgroup->vgroupVersion < 0) {
|
||||
ctgError("vgroup version[%d] is invalid", pVgroup->vgroupVersion);
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
||||
if (NULL == pCatalog->vgroupCache.cache) {
|
||||
pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_CACHE_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == pCatalog->vgroupCache.cache) {
|
||||
ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_CACHE_VGROUP_NUMBER);
|
||||
return TSDB_CODE_CTG_MEM_ERROR;
|
||||
}
|
||||
} else {
|
||||
taosHashClear(pCatalog->vgroupCache.cache);
|
||||
}
|
||||
|
||||
SVgroupInfo *vInfo = NULL;
|
||||
for (int32_t i = 0; i < pVgroup->vgroupNum; ++i) {
|
||||
if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &pVgroup->vgroupInfo[i], sizeof(pVgroup->vgroupInfo[i])) != 0) {
|
||||
ctgError("push to vgroup hash cache failed");
|
||||
goto error_exit;
|
||||
}
|
||||
}
|
||||
|
||||
pCatalog->vgroupCache.vgroupVersion = pVgroup->vgroupVersion;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
error_exit:
|
||||
if (pCatalog->vgroupCache.cache) {
|
||||
taosHashCleanup(pCatalog->vgroupCache.cache);
|
||||
pCatalog->vgroupCache.cache = NULL;
|
||||
}
|
||||
|
||||
pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) {
|
||||
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t exist = 0;
|
||||
|
||||
CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist));
|
||||
|
||||
if (exist) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SVgroupListInfo *pVgroup = NULL;
|
||||
|
||||
CTG_ERR_RET(ctgGetVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &pVgroup));
|
||||
|
||||
CTG_ERR_RET(catalogUpdateVgroupCache(pCatalog, pVgroup));
|
||||
|
||||
if (pVgroupHash) {
|
||||
CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist));
|
||||
}
|
||||
|
||||
if (0 == exist) {
|
||||
ctgError("catalog fetched but get from cache failed");
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
|
||||
if (NULL == pCatalog || NULL == dbName || NULL == version) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
|
@ -629,7 +465,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*version = dbInfo->vgroupVersion;
|
||||
*version = dbInfo->vgVersion;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -639,7 +475,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
|
|||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
||||
if (dbInfo->vgroupVersion < 0) {
|
||||
if (dbInfo->vgVersion < 0) {
|
||||
if (pCatalog->dbCache.cache) {
|
||||
taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||
}
|
||||
|
@ -654,6 +490,12 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
|
|||
ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
|
||||
return TSDB_CODE_CTG_MEM_ERROR;
|
||||
}
|
||||
} else {
|
||||
SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||
if (oldInfo && oldInfo->vgInfo) {
|
||||
taosHashCleanup(oldInfo->vgInfo);
|
||||
oldInfo->vgInfo = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
|
||||
|
@ -667,7 +509,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
|
|||
|
||||
|
||||
|
||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) {
|
||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
|
||||
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
@ -688,28 +530,16 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
|
|||
|
||||
strncpy(input.db, dbName, sizeof(input.db));
|
||||
input.db[sizeof(input.db) - 1] = 0;
|
||||
input.vgroupVersion = pCatalog->vgroupCache.vgroupVersion;
|
||||
input.dbGroupVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
||||
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
|
||||
|
||||
if (DbOut.vgroupList) {
|
||||
CTG_ERR_JRET(catalogUpdateVgroupCache(pCatalog, DbOut.vgroupList));
|
||||
}
|
||||
|
||||
if (DbOut.dbVgroup) {
|
||||
CTG_ERR_JRET(catalogUpdateDBVgroupCache(pCatalog, dbName, DbOut.dbVgroup));
|
||||
}
|
||||
CTG_ERR_RET(catalogUpdateDBVgroupCache(pCatalog, dbName, &DbOut.dbVgroup));
|
||||
|
||||
if (dbInfo) {
|
||||
*dbInfo = DbOut.dbVgroup;
|
||||
DbOut.dbVgroup = NULL;
|
||||
}
|
||||
|
||||
_return:
|
||||
tfree(DbOut.dbVgroup);
|
||||
tfree(DbOut.vgroupList);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -749,16 +579,20 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
|||
STableMeta *tbMeta = NULL;
|
||||
int32_t code = 0;
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
SDBVgroupInfo *dbVgroup = NULL;
|
||||
SDBVgroupInfo dbVgroup = {0};
|
||||
|
||||
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));
|
||||
|
||||
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
||||
CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
|
||||
CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
|
||||
|
||||
CTG_ERR_JRET(ctgGetVgroupFromVgIdBatch(pCatalog, pRpc, pMgmtEps, dbVgroup->vgId, pVgroupList));
|
||||
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, tbMeta->vgId, &vgroupInfo));
|
||||
int32_t vgId = tbMeta->vgId;
|
||||
if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
|
||||
ctgError("vgId[%d] not found in vgroup list", vgId);
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) {
|
||||
ctgError("push vgroupInfo to array failed");
|
||||
|
@ -768,12 +602,6 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
|||
|
||||
_return:
|
||||
tfree(tbMeta);
|
||||
if (dbVgroup && dbVgroup->vgId) {
|
||||
taosArrayDestroy(dbVgroup->vgId);
|
||||
dbVgroup->vgId = NULL;
|
||||
}
|
||||
|
||||
tfree(dbVgroup);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
|
||||
MESSAGE(STATUS "build catalog unit test")
|
||||
|
||||
# GoogleTest requires at least C++11
|
||||
SET(CMAKE_CXX_STANDARD 11)
|
||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||
|
||||
ADD_EXECUTABLE(catalogTest ${SOURCE_LIST})
|
||||
TARGET_LINK_LIBRARIES(
|
||||
catalogTest
|
||||
PUBLIC os util common catalog transport gtest query
|
||||
)
|
||||
|
||||
TARGET_INCLUDE_DIRECTORIES(
|
||||
catalogTest
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/catalog/"
|
||||
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/catalog/inc"
|
||||
)
|
|
@ -0,0 +1,152 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <tglobal.h>
|
||||
#include <iostream>
|
||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||
#include "os.h"
|
||||
|
||||
#include "taos.h"
|
||||
#include "tdef.h"
|
||||
#include "tvariant.h"
|
||||
#include "catalog.h"
|
||||
|
||||
namespace {
|
||||
|
||||
|
||||
}
|
||||
|
||||
TEST(testCase, normalCase) {
|
||||
char *clusterId = "cluster1";
|
||||
struct SCatalog* pCtg = NULL;
|
||||
|
||||
int32_t code = catalogInit(NULL);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
code = catalogGetHandle(clusterId, &pCtg);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
TEST(testCase, normalCase) {
|
||||
SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
|
||||
ASSERT_EQ(info1.valid, true);
|
||||
|
||||
char msg[128] = {0};
|
||||
SMsgBuf buf;
|
||||
buf.len = 128;
|
||||
buf.buf = msg;
|
||||
|
||||
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0);
|
||||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
||||
SQueryStmtInfo* pQueryInfo = createQueryInfo();
|
||||
setTableMetaInfo(pQueryInfo, &req);
|
||||
|
||||
SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0);
|
||||
ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
|
||||
ASSERT_EQ(ret, 0);
|
||||
|
||||
SArray* pExprList = pQueryInfo->exprList[0];
|
||||
|
||||
int32_t num = tsCompatibleModel? 2:1;
|
||||
ASSERT_EQ(taosArrayGetSize(pExprList), num);
|
||||
|
||||
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
|
||||
ASSERT_EQ(p1->base.pColumns->uid, 110);
|
||||
ASSERT_EQ(p1->base.numOfParams, 1);
|
||||
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
|
||||
ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
|
||||
ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP);
|
||||
ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
|
||||
ASSERT_EQ(p1->base.interBytes, 16);
|
||||
|
||||
ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
|
||||
ASSERT_STREQ(p1->pExpr->_function.functionName, "top");
|
||||
|
||||
tExprNode* pParam = p1->pExpr->_function.pChild[0];
|
||||
|
||||
ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE);
|
||||
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
|
||||
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
|
||||
|
||||
struct SQueryPlanNode* n = nullptr;
|
||||
code = createQueryPlan(pQueryInfo, &n);
|
||||
|
||||
char* str = NULL;
|
||||
queryPlanToString(n, &str);
|
||||
printf("%s\n", str);
|
||||
|
||||
destroyQueryInfo(pQueryInfo);
|
||||
qParserClearupMetaRequestInfo(&req);
|
||||
destroySqlInfo(&info1);
|
||||
}
|
||||
|
||||
TEST(testCase, displayPlan) {
|
||||
generateLogicplan("select count(*) from `t.1abc`");
|
||||
generateLogicplan("select count(*)+ 22 from `t.1abc`");
|
||||
generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30");
|
||||
generateLogicplan("select count(*) from `t.1abc` group by a");
|
||||
generateLogicplan("select count(A+B) from `t.1abc` group by a");
|
||||
generateLogicplan("select count(length(a)+b) from `t.1abc` group by a");
|
||||
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)");
|
||||
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc ");
|
||||
generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`");
|
||||
generateLogicplan("select count(*), min(a) + 99 from `t.1abc`");
|
||||
generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`");
|
||||
generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20");
|
||||
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)");
|
||||
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)");
|
||||
|
||||
// order by + group by column + limit offset
|
||||
generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1");
|
||||
|
||||
// fill
|
||||
generateLogicplan("select min(a) from `t.1abc` where ts>now and ts<now+2h interval(1s) fill(linear)");
|
||||
|
||||
// union + union all
|
||||
|
||||
|
||||
|
||||
// join
|
||||
|
||||
// Aggregate(count(*) [count(*) #5056], sum(a) [sum(a) #5057], avg(b) [avg(b) #5058], min(a+b) [min(a+b) #5060])
|
||||
// Projection(cols: [a+b #5059]) filters:(nil)
|
||||
// Projection(cols: [ts #0], [a #1], [b #2]) filters:(nil)
|
||||
// TableScan(t.1abc #110) time_range: -9223372036854775808 - 9223372036854775807
|
||||
|
||||
}
|
||||
*/
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -21,17 +21,6 @@ int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msg
|
|||
|
||||
int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0};
|
||||
|
||||
|
||||
int32_t queryBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||
if (NULL == msg || NULL == msgLen) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
*msgLen = 0;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||
if (NULL == input || NULL == msg || NULL == msgLen) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -81,8 +70,7 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
|
|||
strncpy(bMsg->db, bInput->db, sizeof(bMsg->db));
|
||||
bMsg->db[sizeof(bMsg->db) - 1] = 0;
|
||||
|
||||
bMsg->vgroupVersion = bInput->vgroupVersion;
|
||||
bMsg->dbGroupVersion = bInput->dbGroupVersion;
|
||||
bMsg->vgVersion = bInput->vgVersion;
|
||||
|
||||
*msgLen = (int32_t)sizeof(*bMsg);
|
||||
|
||||
|
@ -90,58 +78,12 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
|
|||
}
|
||||
|
||||
|
||||
|
||||
int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
|
||||
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SVgroupListRspMsg *pRsp = (SVgroupListRspMsg *)msg;
|
||||
|
||||
pRsp->vgroupNum = htonl(pRsp->vgroupNum);
|
||||
pRsp->vgroupVersion = htonl(pRsp->vgroupVersion);
|
||||
|
||||
if (pRsp->vgroupNum < 0) {
|
||||
qError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum);
|
||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
if (pRsp->vgroupVersion < 0) {
|
||||
qError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion);
|
||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
if (msgSize != (pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp))) {
|
||||
qError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum);
|
||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
// keep SVgroupListInfo/SVgroupListRspMsg the same
|
||||
*(SVgroupListInfo **)output = (SVgroupListInfo *)msg;
|
||||
|
||||
if (pRsp->vgroupNum == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pRsp->vgroupNum; ++i) {
|
||||
pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId);
|
||||
for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) {
|
||||
pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
||||
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SUseDbRspMsg *pRsp = (SUseDbRspMsg *)msg;
|
||||
SUseDbRsp *pRsp = (SUseDbRsp *)msg;
|
||||
SUseDbOutput *pOut = (SUseDbOutput *)output;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -150,104 +92,52 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
|||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
pRsp->vgroupVersion = htonl(pRsp->vgroupVersion);
|
||||
pRsp->dbVgroupVersion = htonl(pRsp->dbVgroupVersion);
|
||||
pRsp->vgVersion = htonl(pRsp->vgVersion);
|
||||
pRsp->vgNum = htonl(pRsp->vgNum);
|
||||
|
||||
pRsp->vgroupNum = htonl(pRsp->vgroupNum);
|
||||
pRsp->dbVgroupNum = htonl(pRsp->dbVgroupNum);
|
||||
|
||||
if (pRsp->vgroupNum < 0) {
|
||||
qError("invalid vgroup number[%d]", pRsp->vgroupNum);
|
||||
if (pRsp->vgNum < 0) {
|
||||
qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum);
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
if (pRsp->dbVgroupNum < 0) {
|
||||
qError("invalid db vgroup number[%d]", pRsp->dbVgroupNum);
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
int32_t expectSize = pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + pRsp->dbVgroupNum * sizeof(int32_t) + sizeof(*pRsp);
|
||||
int32_t expectSize = pRsp->vgNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp);
|
||||
if (msgSize != expectSize) {
|
||||
qError("vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d", msgSize, expectSize, pRsp->vgroupNum, pRsp->dbVgroupNum);
|
||||
qError("use db rsp size mis-match, msgSize:%d, expected:%d, vgnumber:%d", msgSize, expectSize, pRsp->vgNum);
|
||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
if (pRsp->vgroupVersion < 0) {
|
||||
qInfo("no new vgroup list info");
|
||||
if (pRsp->vgroupNum != 0) {
|
||||
qError("invalid vgroup number[%d] for no new vgroup list case", pRsp->vgroupNum);
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
} else {
|
||||
int32_t s = sizeof(*pOut->vgroupList) + sizeof(pOut->vgroupList->vgroupInfo[0]) * pRsp->vgroupNum;
|
||||
pOut->vgroupList = calloc(1, s);
|
||||
if (NULL == pOut->vgroupList) {
|
||||
qError("calloc size[%d] failed", s);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pOut->vgroupList->vgroupNum = pRsp->vgroupNum;
|
||||
pOut->vgroupList->vgroupVersion = pRsp->vgroupVersion;
|
||||
|
||||
for (int32_t i = 0; i < pRsp->vgroupNum; ++i) {
|
||||
pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId);
|
||||
for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) {
|
||||
pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port);
|
||||
}
|
||||
|
||||
memcpy(&pOut->vgroupList->vgroupInfo[i], &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]));
|
||||
}
|
||||
pOut->dbVgroup.vgVersion = pRsp->vgVersion;
|
||||
pOut->dbVgroup.hashMethod = pRsp->hashMethod;
|
||||
pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == pOut->dbVgroup.vgInfo) {
|
||||
qError("hash init[%d] failed", pRsp->vgNum);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t *vgIdList = (int32_t *)((char *)pRsp->vgroupInfo + sizeof(pRsp->vgroupInfo[0]) * pRsp->vgroupNum);
|
||||
for (int32_t i = 0; i < pRsp->vgNum; ++i) {
|
||||
pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId);
|
||||
pRsp->vgroupInfo[i].hashBegin = htonl(pRsp->vgroupInfo[i].hashBegin);
|
||||
pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd);
|
||||
|
||||
for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) {
|
||||
pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
|
||||
qError("hash push failed");
|
||||
goto _return;
|
||||
}
|
||||
}
|
||||
|
||||
memcpy(pOut->db, pRsp->db, sizeof(pOut->db));
|
||||
|
||||
if (pRsp->dbVgroupVersion < 0) {
|
||||
qInfo("no new vgroup info for db[%s]", pRsp->db);
|
||||
} else {
|
||||
pOut->dbVgroup = calloc(1, sizeof(*pOut->dbVgroup));
|
||||
if (NULL == pOut->dbVgroup) {
|
||||
qError("calloc size[%d] failed", (int32_t)sizeof(*pOut->dbVgroup));
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pOut->dbVgroup->vgId = taosArrayInit(pRsp->dbVgroupNum, sizeof(int32_t));
|
||||
if (NULL == pOut->dbVgroup->vgId) {
|
||||
qError("taosArrayInit size[%d] failed", pRsp->dbVgroupNum);
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pOut->dbVgroup->vgroupVersion = pRsp->dbVgroupVersion;
|
||||
pOut->dbVgroup->hashRange = htonl(pRsp->dbHashRange);
|
||||
pOut->dbVgroup->hashType = htonl(pRsp->dbHashType);
|
||||
|
||||
if (pOut->dbVgroup->hashRange < 0) {
|
||||
qError("invalid hashRange[%d] for db[%s]", pOut->dbVgroup->hashRange, pRsp->db);
|
||||
code = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pRsp->dbVgroupNum; ++i) {
|
||||
*(vgIdList + i) = htonl(*(vgIdList + i));
|
||||
|
||||
taosArrayPush(pOut->dbVgroup->vgId, vgIdList + i) ;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_exit:
|
||||
if (pOut->dbVgroup && pOut->dbVgroup->vgId) {
|
||||
taosArrayDestroy(pOut->dbVgroup->vgId);
|
||||
pOut->dbVgroup->vgId = NULL;
|
||||
_return:
|
||||
if (pOut) {
|
||||
tfree(pOut->dbVgroup.vgInfo);
|
||||
}
|
||||
|
||||
tfree(pOut->dbVgroup);
|
||||
tfree(pOut->vgroupList);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -375,11 +265,9 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
|
|||
|
||||
void msgInit() {
|
||||
queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg;
|
||||
queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg;
|
||||
queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg;
|
||||
|
||||
queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = queryProcessTableMetaRsp;
|
||||
queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp;
|
||||
queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp;
|
||||
|
||||
/*
|
||||
|
|
Loading…
Reference in New Issue