fix db tsma version not updated by heart beat

This commit is contained in:
wangjiaming0909 2024-08-14 16:23:08 +08:00
parent e97d667ffc
commit daf8ed7e06
8 changed files with 110 additions and 23 deletions

View File

@ -1547,6 +1547,8 @@ typedef struct {
SDbCfgRsp* cfgRsp;
STableTSMAInfoRsp* pTsmaRsp;
int32_t dbTsmaVersion;
char db[TSDB_DB_FNAME_LEN];
int64_t dbId;
} SDbHbRsp;
typedef struct {

View File

@ -415,6 +415,8 @@ int32_t catalogGetTableTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, const SNam
int32_t catalogGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma);
int32_t catalogAsyncUpdateDbTsmaVersion(SCatalog* pCtg, int32_t tsmaVersion, const char* dbFName, int64_t dbId);
/**
* Destroy catalog and relase all resources
*/

View File

@ -269,6 +269,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
TSC_ERR_JRET(catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion));
}
taosArrayClear(rsp->pTsmaRsp->pTsmas);
} else {
TSC_ERR_JRET(catalogAsyncUpdateDbTsmaVersion(pCatalog, rsp->dbTsmaVersion, rsp->db, rsp->dbId));
}
}
}

View File

@ -3824,7 +3824,8 @@ int32_t tSerializeSDbHbRspImp(SEncoder *pEncoder, const SDbHbRsp *pRsp) {
if (tEncodeI8(pEncoder, 0) < 0) return -1;
}
if (tEncodeI32(pEncoder, pRsp->dbTsmaVersion) < 0) return -1;
if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->dbId) < 0) return -1;
return 0;
}
@ -3915,6 +3916,10 @@ int32_t tDeserializeSDbHbRspImp(SDecoder *decoder, SDbHbRsp *pRsp) {
if (!tDecodeIsEnd(decoder)) {
if (tDecodeI32(decoder, &pRsp->dbTsmaVersion) < 0) return -1;
}
if (!tDecodeIsEnd(decoder)) {
if (tDecodeCStrTo(decoder, pRsp->db) < 0) return -1;
if (tDecodeI64(decoder, &pRsp->dbId) < 0) return -1;
}
return 0;
}

View File

@ -1843,6 +1843,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
pDbCacheInfo->tsmaVersion = htonl(pDbCacheInfo->tsmaVersion);
SDbHbRsp rsp = {0};
(void)memcpy(rsp.db, pDbCacheInfo->dbFName, TSDB_DB_FNAME_LEN);
rsp.dbId = pDbCacheInfo->dbId;
if ((0 == strcasecmp(pDbCacheInfo->dbFName, TSDB_INFORMATION_SCHEMA_DB) ||
(0 == strcasecmp(pDbCacheInfo->dbFName, TSDB_PERFORMANCE_SCHEMA_DB)))) {

View File

@ -108,6 +108,7 @@ enum {
CTG_OP_UPDATE_TB_TSMA,
CTG_OP_DROP_TB_TSMA,
CTG_OP_CLEAR_CACHE,
CTG_OP_UPDATE_DB_TSMA_VERSION,
CTG_OP_MAX
};
@ -603,6 +604,7 @@ typedef struct SCtgUpdateTbTSMAMsg {
STableTSMAInfo* pTsma;
int32_t dbTsmaVersion;
uint64_t dbId;
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgUpdateTbTSMAMsg;
typedef struct SCtgDropTbTSMAMsg {
@ -1167,6 +1169,8 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn,
void* bInput);
int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, int32_t flag,
CTG_TSMA_FETCH_TYPE fetchType, const SName* sourceTbName);
int32_t ctgOpUpdateDbTsmaVersion(SCtgCacheOperation* pOper);
int32_t ctgUpdateDbTsmaVersionEnqueue(SCatalog* pCtg, int32_t tsmaVersion, const char* dbFName, int64_t dbId, bool syncOper);
void ctgFreeTask(SCtgTask* pTask, bool freeRes);
extern SCatalogMgmt gCtgMgmt;

View File

@ -1933,6 +1933,18 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogAsyncUpdateDbTsmaVersion(SCatalog* pCtg, int32_t tsmaVersion, const char* dbFName, int64_t dbId) {
CTG_API_ENTER();
if (!pCtg || !dbFName) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgUpdateDbTsmaVersionEnqueue(pCtg, tsmaVersion, dbFName, dbId, false));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogClearCache(void) {
CTG_API_ENTER_NOLOCK();

View File

@ -34,7 +34,8 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v
{CTG_OP_DROP_VIEW_META, "drop viewMeta", ctgOpDropViewMeta},
{CTG_OP_UPDATE_TB_TSMA, "update tbTSMA", ctgOpUpdateTbTSMA},
{CTG_OP_DROP_TB_TSMA, "drop tbTSMA", ctgOpDropTbTSMA},
{CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
{CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache},
{CTG_OP_UPDATE_DB_TSMA_VERSION, "update dbTsmaVersion", ctgOpUpdateDbTsmaVersion}};
SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = {
{"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL}, //CTG_CI_CLUSTER
@ -1628,6 +1629,41 @@ _return:
CTG_RET(code);
}
int32_t ctgUpdateDbTsmaVersionEnqueue(SCatalog* pCtg, int32_t tsmaVersion, const char* dbFName, int64_t dbId, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
if (NULL == op) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgCacheOperation));
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
op->opId = CTG_OP_UPDATE_DB_TSMA_VERSION;
op->syncOp = syncOp;
SCtgUpdateTbTSMAMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbTSMAMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbTSMAMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
msg->pTsma = NULL;
msg->dbTsmaVersion = tsmaVersion;
msg->dbId = dbId;
memcpy(msg->dbFName, dbFName, TSDB_DB_FNAME_LEN);
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
CTG_RET(code);
}
int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
int32_t code = 0;
@ -3010,6 +3046,32 @@ _return:
CTG_RET(code);
}
static int32_t ctgOpUpdateDbRentForTsmaVersion(SCtgDBCache* pDbCache, SCtgUpdateTbTSMAMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
if (pDbCache && pMsg->dbTsmaVersion > 0) {
pDbCache->tsmaVersion = pMsg->dbTsmaVersion;
SDbCacheInfo cacheInfo = {0};
cacheInfo.dbId = pDbCache->dbId;
if (pDbCache->cfgCache.cfgInfo) {
cacheInfo.cfgVersion = pDbCache->cfgCache.cfgInfo->cfgVersion;
tstrncpy(cacheInfo.dbFName, pDbCache->cfgCache.cfgInfo->db, TSDB_DB_FNAME_LEN);
}
if (pDbCache->vgCache.vgInfo) {
cacheInfo.vgVersion = pDbCache->vgCache.vgInfo->vgVersion;
cacheInfo.numOfTable = pDbCache->vgCache.vgInfo->numOfTable;
cacheInfo.stateTs = pDbCache->vgCache.vgInfo->stateTs;
}
cacheInfo.tsmaVersion = pDbCache->tsmaVersion;
CTG_ERR_JRET(ctgMetaRentUpdate(&pMsg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo),
ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare));
}
_return:
CTG_RET(code);
}
int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateTbTSMAMsg *msg = operation->data;
@ -3023,27 +3085,7 @@ int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation *operation) {
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pTsmaInfo->dbFName, pTsmaInfo->dbId, &dbCache));
CTG_ERR_JRET(ctgWriteTbTSMAToCache(pCtg, dbCache, pTsmaInfo->dbFName, pTsmaInfo->tb, &pTsmaInfo));
if (dbCache && msg->dbTsmaVersion > 0) {
dbCache->tsmaVersion = msg->dbTsmaVersion;
SDbCacheInfo cacheInfo = {0};
cacheInfo.dbId = dbCache->dbId;
if (dbCache->cfgCache.cfgInfo) {
cacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion;
tstrncpy(cacheInfo.dbFName, dbCache->cfgCache.cfgInfo->db, TSDB_DB_FNAME_LEN);
}
if (dbCache->vgCache.vgInfo) {
cacheInfo.vgVersion = dbCache->vgCache.vgInfo->vgVersion;
cacheInfo.numOfTable = dbCache->vgCache.vgInfo->numOfTable;
cacheInfo.stateTs = dbCache->vgCache.vgInfo->stateTs;
}
cacheInfo.tsmaVersion = dbCache->tsmaVersion;
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo),
ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare));
}
CTG_ERR_JRET(ctgOpUpdateDbRentForTsmaVersion(dbCache, msg));
_return:
@ -3057,6 +3099,22 @@ _return:
CTG_RET(code);
}
int32_t ctgOpUpdateDbTsmaVersion(SCtgCacheOperation *pOper) {
int32_t code = 0;
SCtgUpdateTbTSMAMsg *pMsg = pOper->data;
SCatalog *pCtg = pMsg->pCtg;
SCtgDBCache *pDbCache = NULL;
if (pCtg->stopUpdate) goto _return;
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMsg->dbFName, pMsg->dbId, &pDbCache));
CTG_ERR_JRET(ctgOpUpdateDbRentForTsmaVersion(pDbCache, pMsg));
_return:
taosMemoryFreeClear(pMsg);
CTG_RET(code);
}
void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
if (NULL == op || NULL == op->data) {