Merge branch '3.0' into feature/vnode
This commit is contained in:
commit
1146555ade
|
@ -32,7 +32,7 @@ extern "C" {
|
|||
struct SCatalog;
|
||||
|
||||
typedef struct SCatalogReq {
|
||||
char clusterId[TSDB_CLUSTER_ID_LEN]; //????
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
SArray *pTableName; // table full name
|
||||
SArray *pUdf; // udf name
|
||||
bool qNodeRequired; // valid qnode
|
||||
|
@ -82,7 +82,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
|
|||
|
||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName);
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta);
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
|
||||
|
||||
|
||||
/**
|
||||
|
@ -91,7 +91,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
|
|||
* @pVgroupList - array of SVgroupInfo
|
||||
* @return
|
||||
*/
|
||||
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList);
|
||||
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList);
|
||||
|
||||
|
||||
/**
|
||||
|
@ -103,7 +103,7 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
|||
* @param pMetaData
|
||||
* @return
|
||||
*/
|
||||
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
|
||||
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
|
||||
|
||||
|
||||
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
extern bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "catalogInt.h"
|
||||
#include "trpc.h"
|
||||
#include "query.h"
|
||||
#include "tname.h"
|
||||
|
||||
SCatalogMgmt ctgMgmt = {0};
|
||||
|
||||
|
@ -47,7 +48,7 @@ int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) {
|
||||
int32_t ctgGetVgroupFromCache(struct SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) {
|
||||
if (NULL == pCatalog->vgroupCache.cache || pCatalog->vgroupCache.vgroupVersion < 0) {
|
||||
*exist = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -62,7 +63,7 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SHashObj** pVgroupList, int32_
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) {
|
||||
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) {
|
||||
if (NULL == pCatalog->dbCache.cache) {
|
||||
*exist = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -130,13 +131,13 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgGetTableMetaFromCache(SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) {
|
||||
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) {
|
||||
if (NULL == pCatalog->tableCache.cache) {
|
||||
*exist = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
|
||||
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName);
|
||||
|
||||
|
@ -200,7 +201,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
|
|||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
||||
char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
|
||||
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
|
||||
|
||||
|
@ -251,7 +252,54 @@ int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
|
||||
int32_t ctgGetVgroupFromVgId(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, int32_t vgId, SVgroupInfo *pVgroup) {
|
||||
SHashObj *vgroupHash = NULL;
|
||||
|
||||
CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash));
|
||||
if (NULL == vgroupHash) {
|
||||
ctgError("get empty vgroup cache");
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) {
|
||||
ctgError("vgId[%d] not found in vgroup list", vgId);
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetVgroupFromVgIdBatch(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SArray* vgIds, SArray* vgroupList) {
|
||||
SHashObj *vgroupHash = NULL;
|
||||
SVgroupInfo pVgroup = {0};
|
||||
int32_t vgIdNum = taosArrayGetSize(vgIds);
|
||||
|
||||
CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash));
|
||||
if (NULL == vgroupHash) {
|
||||
ctgError("get empty vgroup cache");
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < vgIdNum; ++i) {
|
||||
int32_t *vgId = taosArrayGet(vgIds, i);
|
||||
|
||||
if (NULL == taosHashGetClone(vgroupHash, vgId, sizeof(*vgId), &pVgroup)) {
|
||||
ctgError("vgId[%d] not found in vgroup list", vgId);
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(vgroupList, &pVgroup)) {
|
||||
ctgError("push vgroup to array failed, idx:%d", i);
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
|
||||
SDBVgroupInfo *dbInfo = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -269,7 +317,7 @@ int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgm
|
|||
|
||||
int32_t vgNum = taosArrayGetSize(dbInfo->vgId);
|
||||
if (vgNum <= 0) {
|
||||
ctgError("db[%s] vgroup cache invalid, vgroup number:%p", vgNum);
|
||||
ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
|
||||
CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
||||
}
|
||||
|
||||
|
@ -277,7 +325,7 @@ int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgm
|
|||
|
||||
CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashType, &fp));
|
||||
|
||||
char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
|
||||
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
|
||||
|
||||
|
@ -285,18 +333,7 @@ int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgm
|
|||
uint32_t hashUnit = dbInfo->hashRange / vgNum;
|
||||
uint32_t vgId = hashValue / hashUnit;
|
||||
|
||||
SHashObj *vgroupHash = NULL;
|
||||
|
||||
CTG_ERR_JRET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash));
|
||||
if (NULL == vgroupHash) {
|
||||
ctgError("get empty vgroup cache");
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) {
|
||||
ctgError("vgId[%d] not found in vgroup list", vgId);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, vgId, pVgroup));
|
||||
|
||||
_return:
|
||||
if (dbInfo && dbInfo->vgId) {
|
||||
|
@ -335,7 +372,7 @@ STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) {
|
|||
return pTableMeta;
|
||||
}
|
||||
|
||||
int32_t ctgGetTableMetaImpl(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
|
||||
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
|
||||
if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
@ -363,7 +400,7 @@ int32_t ctgGetTableMetaImpl(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtE
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgUpdateTableMetaCache(SCatalog *pCatalog, STableMetaOutput *output) {
|
||||
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
|
||||
if (output->metaNum != 1 && output->metaNum != 2) {
|
||||
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
|
@ -433,7 +470,6 @@ error_exit:
|
|||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogInit(SCatalogCfg *cfg) {
|
||||
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == ctgMgmt.pCluster) {
|
||||
|
@ -451,7 +487,7 @@ int32_t catalogInit(SCatalogCfg *cfg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t catalogGetHandle(const char *clusterId, SCatalog** catalogHandle) {
|
||||
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) {
|
||||
if (NULL == clusterId || NULL == catalogHandle) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
@ -545,7 +581,7 @@ error_exit:
|
|||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) {
|
||||
int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) {
|
||||
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
@ -677,7 +713,7 @@ _return:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t catalogGetTableMeta(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
||||
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, false, pTableMeta);
|
||||
}
|
||||
|
||||
|
@ -701,21 +737,97 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta) {
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
||||
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta);
|
||||
}
|
||||
|
||||
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
|
||||
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
|
||||
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) {
|
||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
||||
return 0;
|
||||
STableMeta *tbMeta = NULL;
|
||||
int32_t code = 0;
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
SDBVgroupInfo *dbVgroup = NULL;
|
||||
|
||||
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));
|
||||
|
||||
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
||||
CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
|
||||
|
||||
CTG_ERR_JRET(ctgGetVgroupFromVgIdBatch(pCatalog, pRpc, pMgmtEps, dbVgroup->vgId, pVgroupList));
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, tbMeta->vgId, &vgroupInfo));
|
||||
|
||||
if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) {
|
||||
ctgError("push vgroupInfo to array failed");
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
_return:
|
||||
tfree(tbMeta);
|
||||
if (dbVgroup && dbVgroup->vgId) {
|
||||
taosArrayDestroy(dbVgroup->vgId);
|
||||
dbVgroup->vgId = NULL;
|
||||
}
|
||||
|
||||
tfree(dbVgroup);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
|
||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
if (pReq->pTableName) {
|
||||
char dbName[TSDB_FULL_DB_NAME_LEN];
|
||||
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
|
||||
if (tbNum > 0) {
|
||||
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
|
||||
if (NULL == pRsp->pTableMeta) {
|
||||
ctgError("taosArrayInit num[%d] failed", tbNum);
|
||||
return TSDB_CODE_CTG_MEM_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tbNum; ++i) {
|
||||
SName *name = taosArrayGet(pReq->pTableName, i);
|
||||
STableMeta *pTableMeta = NULL;
|
||||
|
||||
snprintf(dbName, sizeof(dbName), "%s.%s", name->acctId, name->dbname);
|
||||
|
||||
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, dbName, name->tname, &pTableMeta));
|
||||
|
||||
if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
|
||||
ctgError("taosArrayPush failed, idx:%d", i);
|
||||
tfree(pTableMeta);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
if (pRsp->pTableMeta) {
|
||||
int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
|
||||
for (int32_t i = 0; i < aSize; ++i) {
|
||||
STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
|
||||
tfree(pMeta);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pRsp->pTableMeta);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void catalogDestroy(void) {
|
||||
|
|
|
@ -4088,7 +4088,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
|
|||
}
|
||||
|
||||
// load the meta data from catalog
|
||||
code = catalogGetAllMeta(pCatalog, NULL, &req, &data);
|
||||
code = catalogGetAllMeta(pCatalog, NULL, NULL, &req, &data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue