feature/qnode
This commit is contained in:
parent
32b1e1bf3a
commit
675cb71e10
|
@ -103,15 +103,28 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|||
*/
|
||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||
|
||||
/**
|
||||
* Get a super table's meta data.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pTransporter (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||
|
||||
|
||||
/**
|
||||
* Force renew a table's local cached meta data.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pTransporter (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName);
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable);
|
||||
|
||||
/**
|
||||
* Force renew a table's local cached meta data and get the new one.
|
||||
|
@ -120,9 +133,11 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co
|
|||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
||||
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -119,6 +119,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
|
|||
}
|
||||
|
||||
*exist = 1;
|
||||
|
||||
tbMeta = *pTableMeta;
|
||||
|
||||
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
|
||||
ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName);
|
||||
|
@ -199,14 +201,7 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
|
||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) {
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pTableName, tbFullName);
|
||||
|
||||
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) {
|
||||
SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
|
||||
char *msg = NULL;
|
||||
SEpSet *pVnodeEpSet = NULL;
|
||||
|
@ -252,6 +247,12 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pTableName, tbFullName);
|
||||
|
||||
return ctgGetTableMetaFromMnodeImpl(pCatalog, pRpc, pMgmtEps, tbFullName, output);
|
||||
}
|
||||
|
||||
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
|
||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
|
||||
|
@ -456,14 +457,12 @@ int32_t ctgMetaRentAdd(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size
|
|||
if (NULL == slot->meta) {
|
||||
slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
|
||||
if (NULL == slot->meta) {
|
||||
CTG_UNLOCK(CTG_WRITE, &slot->lock);
|
||||
qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(slot->meta, meta)) {
|
||||
CTG_UNLOCK(CTG_WRITE, &slot->lock);
|
||||
qError("taosArrayPush meta to rent failed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
@ -486,7 +485,6 @@ int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t s
|
|||
|
||||
CTG_LOCK(CTG_WRITE, &slot->lock);
|
||||
if (NULL == slot->meta) {
|
||||
CTG_UNLOCK(CTG_WRITE, &slot->lock);
|
||||
qError("meta in slot is empty, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
@ -498,8 +496,7 @@ int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t s
|
|||
}
|
||||
|
||||
void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
|
||||
if (NULL == orig) {
|
||||
CTG_UNLOCK(CTG_WRITE, &slot->lock);
|
||||
if (NULL == orig) {
|
||||
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
@ -610,6 +607,10 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
|||
ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.cache, NULL, cache)) {
|
||||
taosHashCleanup(cache);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pCatalog->tableCache.stableCache) {
|
||||
|
@ -618,6 +619,10 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
|||
ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.stableCache, NULL, cache)) {
|
||||
taosHashCleanup(cache);
|
||||
}
|
||||
}
|
||||
|
||||
if (output->metaNum == 2) {
|
||||
|
@ -1039,6 +1044,10 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|||
ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache.cache, NULL, cache)) {
|
||||
taosHashCleanup(cache);
|
||||
}
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
|
||||
}
|
||||
|
@ -1084,22 +1093,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, con
|
|||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
|
||||
|
||||
STableMetaOutput output = {0};
|
||||
|
||||
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output));
|
||||
|
||||
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
|
||||
|
||||
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
|
||||
|
||||
_return:
|
||||
tfree(output.tbMeta);
|
||||
CTG_RET(code);
|
||||
return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable);
|
||||
}
|
||||
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
|
||||
|
|
|
@ -601,7 +601,6 @@ void *ctgTestSetCtableMetaThread(void *param) {
|
|||
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
TEST(tableMeta, normalTable) {
|
||||
struct SCatalog* pCtg = NULL;
|
||||
|
@ -841,7 +840,7 @@ TEST(tableMeta, superTableCase) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
tableMeta = NULL;
|
||||
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(tableMeta->vgId, 9);
|
||||
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
|
||||
|
|
Loading…
Reference in New Issue