diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 70cf9c8b58..1d926ba42c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1547,6 +1547,8 @@ typedef struct { SDbCfgRsp* cfgRsp; STableTSMAInfoRsp* pTsmaRsp; int32_t dbTsmaVersion; + char db[TSDB_DB_FNAME_LEN]; + int64_t dbId; } SDbHbRsp; typedef struct { diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 3f1cf74cfa..11ed7c7da6 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -415,6 +415,8 @@ int32_t catalogGetTableTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, const SNam int32_t catalogGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma); +int32_t catalogAsyncUpdateDbTsmaVersion(SCatalog* pCtg, int32_t tsmaVersion, const char* dbFName, int64_t dbId); + /** * Destroy catalog and relase all resources */ diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 415c2d6685..70a519d8ae 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -269,6 +269,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog TSC_ERR_JRET(catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion)); } taosArrayClear(rsp->pTsmaRsp->pTsmas); + } else { + TSC_ERR_JRET(catalogAsyncUpdateDbTsmaVersion(pCatalog, rsp->dbTsmaVersion, rsp->db, rsp->dbId)); } } } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 740e517e35..4dc59bf6fe 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3824,7 +3824,8 @@ int32_t tSerializeSDbHbRspImp(SEncoder *pEncoder, const SDbHbRsp *pRsp) { if (tEncodeI8(pEncoder, 0) < 0) return -1; } if (tEncodeI32(pEncoder, pRsp->dbTsmaVersion) < 0) return -1; - + if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->dbId) < 0) return -1; return 0; } @@ -3915,6 +3916,10 @@ int32_t tDeserializeSDbHbRspImp(SDecoder *decoder, SDbHbRsp *pRsp) { if (!tDecodeIsEnd(decoder)) { if (tDecodeI32(decoder, &pRsp->dbTsmaVersion) < 0) return -1; } + if (!tDecodeIsEnd(decoder)) { + if (tDecodeCStrTo(decoder, pRsp->db) < 0) return -1; + if (tDecodeI64(decoder, &pRsp->dbId) < 0) return -1; + } return 0; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index dd3f89c9d0..fe5c12419c 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1843,6 +1843,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, pDbCacheInfo->tsmaVersion = htonl(pDbCacheInfo->tsmaVersion); SDbHbRsp rsp = {0}; + (void)memcpy(rsp.db, pDbCacheInfo->dbFName, TSDB_DB_FNAME_LEN); + rsp.dbId = pDbCacheInfo->dbId; if ((0 == strcasecmp(pDbCacheInfo->dbFName, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcasecmp(pDbCacheInfo->dbFName, TSDB_PERFORMANCE_SCHEMA_DB)))) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index f70cfff71d..f3b1852ce1 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -108,6 +108,7 @@ enum { CTG_OP_UPDATE_TB_TSMA, CTG_OP_DROP_TB_TSMA, CTG_OP_CLEAR_CACHE, + CTG_OP_UPDATE_DB_TSMA_VERSION, CTG_OP_MAX }; @@ -603,6 +604,7 @@ typedef struct SCtgUpdateTbTSMAMsg { STableTSMAInfo* pTsma; int32_t dbTsmaVersion; uint64_t dbId; + char dbFName[TSDB_DB_FNAME_LEN]; } SCtgUpdateTbTSMAMsg; typedef struct SCtgDropTbTSMAMsg { @@ -1167,6 +1169,8 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, void* bInput); int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, int32_t flag, CTG_TSMA_FETCH_TYPE fetchType, const SName* sourceTbName); +int32_t ctgOpUpdateDbTsmaVersion(SCtgCacheOperation* pOper); +int32_t ctgUpdateDbTsmaVersionEnqueue(SCatalog* pCtg, int32_t tsmaVersion, const char* dbFName, int64_t dbId, bool syncOper); void ctgFreeTask(SCtgTask* pTask, bool freeRes); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 27a7ce1022..d4c79a6c8d 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1933,6 +1933,18 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogAsyncUpdateDbTsmaVersion(SCatalog* pCtg, int32_t tsmaVersion, const char* dbFName, int64_t dbId) { + CTG_API_ENTER(); + if (!pCtg || !dbFName) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + int32_t code = 0; + CTG_ERR_JRET(ctgUpdateDbTsmaVersionEnqueue(pCtg, tsmaVersion, dbFName, dbId, false)); + +_return: + CTG_API_LEAVE(code); +} + int32_t catalogClearCache(void) { CTG_API_ENTER_NOLOCK(); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 8e5aba26af..689bf900e2 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -34,7 +34,8 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v {CTG_OP_DROP_VIEW_META, "drop viewMeta", ctgOpDropViewMeta}, {CTG_OP_UPDATE_TB_TSMA, "update tbTSMA", ctgOpUpdateTbTSMA}, {CTG_OP_DROP_TB_TSMA, "drop tbTSMA", ctgOpDropTbTSMA}, - {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}}; + {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}, + {CTG_OP_UPDATE_DB_TSMA_VERSION, "update dbTsmaVersion", ctgOpUpdateDbTsmaVersion}}; SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = { {"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL}, //CTG_CI_CLUSTER @@ -1628,6 +1629,41 @@ _return: CTG_RET(code); } +int32_t ctgUpdateDbTsmaVersionEnqueue(SCatalog* pCtg, int32_t tsmaVersion, const char* dbFName, int64_t dbId, bool syncOp) { + int32_t code = 0; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + if (NULL == op) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgCacheOperation)); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + op->opId = CTG_OP_UPDATE_DB_TSMA_VERSION; + op->syncOp = syncOp; + + SCtgUpdateTbTSMAMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbTSMAMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbTSMAMsg)); + taosMemoryFree(op); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + msg->pCtg = pCtg; + msg->pTsma = NULL; + msg->dbTsmaVersion = tsmaVersion; + msg->dbId = dbId; + memcpy(msg->dbFName, dbFName, TSDB_DB_FNAME_LEN); + + op->data = msg; + + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); + + return TSDB_CODE_SUCCESS; + +_return: + + CTG_RET(code); +} + int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { int32_t code = 0; @@ -3010,6 +3046,32 @@ _return: CTG_RET(code); } +static int32_t ctgOpUpdateDbRentForTsmaVersion(SCtgDBCache* pDbCache, SCtgUpdateTbTSMAMsg* pMsg) { + int32_t code = TSDB_CODE_SUCCESS; + if (pDbCache && pMsg->dbTsmaVersion > 0) { + pDbCache->tsmaVersion = pMsg->dbTsmaVersion; + SDbCacheInfo cacheInfo = {0}; + cacheInfo.dbId = pDbCache->dbId; + + if (pDbCache->cfgCache.cfgInfo) { + cacheInfo.cfgVersion = pDbCache->cfgCache.cfgInfo->cfgVersion; + tstrncpy(cacheInfo.dbFName, pDbCache->cfgCache.cfgInfo->db, TSDB_DB_FNAME_LEN); + } + + if (pDbCache->vgCache.vgInfo) { + cacheInfo.vgVersion = pDbCache->vgCache.vgInfo->vgVersion; + cacheInfo.numOfTable = pDbCache->vgCache.vgInfo->numOfTable; + cacheInfo.stateTs = pDbCache->vgCache.vgInfo->stateTs; + } + + cacheInfo.tsmaVersion = pDbCache->tsmaVersion; + CTG_ERR_JRET(ctgMetaRentUpdate(&pMsg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo), + ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare)); + } +_return: + CTG_RET(code); +} + int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation *operation) { int32_t code = 0; SCtgUpdateTbTSMAMsg *msg = operation->data; @@ -3023,27 +3085,7 @@ int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation *operation) { CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pTsmaInfo->dbFName, pTsmaInfo->dbId, &dbCache)); CTG_ERR_JRET(ctgWriteTbTSMAToCache(pCtg, dbCache, pTsmaInfo->dbFName, pTsmaInfo->tb, &pTsmaInfo)); - - if (dbCache && msg->dbTsmaVersion > 0) { - dbCache->tsmaVersion = msg->dbTsmaVersion; - SDbCacheInfo cacheInfo = {0}; - cacheInfo.dbId = dbCache->dbId; - - if (dbCache->cfgCache.cfgInfo) { - cacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion; - tstrncpy(cacheInfo.dbFName, dbCache->cfgCache.cfgInfo->db, TSDB_DB_FNAME_LEN); - } - - if (dbCache->vgCache.vgInfo) { - cacheInfo.vgVersion = dbCache->vgCache.vgInfo->vgVersion; - cacheInfo.numOfTable = dbCache->vgCache.vgInfo->numOfTable; - cacheInfo.stateTs = dbCache->vgCache.vgInfo->stateTs; - } - - cacheInfo.tsmaVersion = dbCache->tsmaVersion; - CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo), - ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare)); - } + CTG_ERR_JRET(ctgOpUpdateDbRentForTsmaVersion(dbCache, msg)); _return: @@ -3057,6 +3099,22 @@ _return: CTG_RET(code); } +int32_t ctgOpUpdateDbTsmaVersion(SCtgCacheOperation *pOper) { + int32_t code = 0; + SCtgUpdateTbTSMAMsg *pMsg = pOper->data; + SCatalog *pCtg = pMsg->pCtg; + SCtgDBCache *pDbCache = NULL; + + if (pCtg->stopUpdate) goto _return; + + CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMsg->dbFName, pMsg->dbId, &pDbCache)); + CTG_ERR_JRET(ctgOpUpdateDbRentForTsmaVersion(pDbCache, pMsg)); + +_return: + taosMemoryFreeClear(pMsg); + CTG_RET(code); +} + void ctgFreeCacheOperationData(SCtgCacheOperation *op) { if (NULL == op || NULL == op->data) {