feature/qnode

This commit is contained in:
dapan 2022-02-12 09:22:04 +08:00
parent 5f2dc1401f
commit b44baf3c20
4 changed files with 52 additions and 45 deletions

View File

@ -1839,5 +1839,4 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
} }
#endif #endif
#endif /*_TD_COMMON_TAOS_MSG_H_*/ #endif /*_TD_COMMON_TAOS_MSG_H_*/

View File

@ -1468,5 +1468,4 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) { static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }

View File

@ -18,6 +18,12 @@
#include "tname.h" #include "tname.h"
#include "catalogInt.h" #include "catalogInt.h"
int32_t ctgActUpdateVg(SCtgMetaAction *action);
int32_t ctgActUpdateTbl(SCtgMetaAction *action);
int32_t ctgActRemoveDB(SCtgMetaAction *action);
int32_t ctgActRemoveStb(SCtgMetaAction *action);
int32_t ctgActRemoveTbl(SCtgMetaAction *action);
SCatalogMgmt ctgMgmt = {0}; SCatalogMgmt ctgMgmt = {0};
SCtgDebug gCTGDebug = {0}; SCtgDebug gCTGDebug = {0};
@ -242,6 +248,44 @@ void ctgWReleaseVgInfo(SCtgDBCache *dbCache) {
} }
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
SCtgDBCache *dbCache = NULL;
if (acquire) {
dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
} else {
dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
}
if (NULL == dbCache) {
*pCache = NULL;
ctgDebug("db not in cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
if (dbCache->deleted) {
if (acquire) {
ctgReleaseDBCache(pCtg, dbCache);
}
*pCache = NULL;
ctgDebug("db is removing from cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}
int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool *inCache) { int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool *inCache) {
if (NULL == pCtg->dbCache) { if (NULL == pCtg->dbCache) {
*pCache = NULL; *pCache = NULL;
@ -978,43 +1022,6 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
SCtgDBCache *dbCache = NULL;
if (acquire) {
dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
} else {
dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
}
if (NULL == dbCache) {
*pCache = NULL;
ctgDebug("db not in cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
if (dbCache->deleted) {
if (acquire) {
ctgReleaseDBCache(pCtg, dbCache);
}
*pCache = NULL;
ctgDebug("db is removing from cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}
int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}
int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) { int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
int32_t code = 0; int32_t code = 0;
@ -1235,7 +1242,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
int32_t metaSize = CTG_META_SIZE(output->tbMeta); int32_t metaSize = CTG_META_SIZE(output->tbMeta);
(*pOutput)->tbMeta = malloc(metaSize); (*pOutput)->tbMeta = malloc(metaSize);
if (NULL == (*pOutput)->tbMeta) { if (NULL == (*pOutput)->tbMeta) {
qError("malloc %d failed", sizeof(STableMetaOutput)); qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
tfree(*pOutput); tfree(*pOutput);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }

View File

@ -33,12 +33,14 @@
#include "tep.h" #include "tep.h"
#include "trpc.h" #include "trpc.h"
#include "tvariant.h" #include "tvariant.h"
#include "catalogInt.h"
namespace { namespace {
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta, extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta,
int32_t *exist); int32_t *exist);
extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type); extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
extern "C" int32_t ctgActUpdateTbl(SCtgMetaAction *action);
void ctgTestSetPrepareTableMeta(); void ctgTestSetPrepareTableMeta();
void ctgTestSetPrepareCTableMeta(); void ctgTestSetPrepareCTableMeta();
@ -725,9 +727,9 @@ void *ctgTestSetCtableMetaThread(void *param) {
action.act = CTG_ACT_UPDATE_TBL; action.act = CTG_ACT_UPDATE_TBL;
while (!ctgTestStop) { while (!ctgTestStop) {
SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg)); SCtgUpdateTblMsg *msg = (SCtgUpdateTblMsg *)malloc(sizeof(SCtgUpdateTblMsg));
msg->pCtg = pCtg; msg->pCtg = pCtg;
msg->output = output; msg->output = &output;
action.data = msg; action.data = msg;
code = ctgActUpdateTbl(&action); code = ctgActUpdateTbl(&action);
@ -1077,7 +1079,7 @@ TEST(tableMeta, rmStbMeta) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
code = catalogRemoveSTableMeta(pCtg, "1.db1", ctgTestSTablename, ctgTestSuid); code = catalogRemoveStbMeta(pCtg, "1.db1", ctgTestDbId, ctgTestSTablename, ctgTestSuid);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1); ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);