diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 2c684f8f76..8b9c1aae31 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -210,7 +210,7 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg); -int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg); +int32_t catalogAsyncUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg); int32_t catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 203aad8068..7bfb466227 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -217,7 +217,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_TSC_INVALID_VALUE; } - catalogUpdateTableMeta(pCatalog, rsp); + catalogAsyncUpdateTableMeta(pCatalog, rsp); } } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f8eade1d7c..6f129d7418 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -815,7 +815,7 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) { } int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) { - return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res); + return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res); } int32_t handleQueryExecRsp(SRequestObj* pRequest) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9f892f23ff..7728749945 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -964,7 +964,7 @@ uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg); uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth); uint64_t ctgGetClusterCacheSize(SCatalog *pCtg); void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone); -void ctgClearAllHandleMeta(bool *roundDone); +void ctgClearAllHandleMeta(int64_t *clearedSize, int64_t *clearedNum, bool *roundDone); void ctgProcessTimerEvent(void *param, void *tmrId); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 2dbe3c31ba..169dad4a89 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -647,7 +647,8 @@ void ctgProcessTimerEvent(void *param, void *tmrId) { goto _return; } } - + + qTrace("reset catalog timer"); taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); _return: @@ -1105,6 +1106,22 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogAsyncUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pMsg) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t code = 0; + CTG_ERR_JRET(ctgUpdateTbMeta(pCtg, pMsg, false)); + +_return: + + CTG_API_LEAVE(code); +} + + int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* pTables) { CTG_API_ENTER(); @@ -1571,6 +1588,13 @@ void catalogDestroy(void) { return; } + if (gCtgMgmt.cacheTimer) { + taosTmrStop(gCtgMgmt.cacheTimer); + gCtgMgmt.cacheTimer = NULL; + taosTmrCleanUp(gCtgMgmt.timer); + gCtgMgmt.timer = NULL; + } + atomic_store_8((int8_t*)&gCtgMgmt.exit, true); if (!taosCheckCurrentInDll()) { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index c697c81325..50e3d9ba57 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -791,7 +791,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) { CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); - ctgDebug("action [%s] added into queue", opName); + ctgDebug("%sync action [%s] added into queue", syncOp ? "S": "As", opName); CTG_QUEUE_INC(); CTG_STAT_RT_INC(numOfOpEnqueue, 1); @@ -2286,21 +2286,22 @@ void ctgClearMetaCache(SCtgCacheOperation *operation) { if (pCtg) { ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, &roundDone); } else { - ctgClearAllHandleMeta(&roundDone); + ctgClearAllHandleMeta(&clearedSize, &clearedNum, &roundDone); } - qDebug("catalog finish one round meta clear, done:%d", roundDone); + qDebug("catalog finish one round meta clear, clearedSize:%" PRId64 ", clearedNum:%" PRId64 ", done:%d", clearedSize, clearedNum, roundDone); ctgGetGlobalCacheSize(&remainSize); int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize); - if (!CTG_CACHE_LOW(remainSize, cacheMaxSize)) { + if (CTG_CACHE_LOW(remainSize, cacheMaxSize)) { qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize); taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); return; } if (!roundDone) { + qDebug("catalog all meta cleared, remainSize:%" PRId64 ", cacheMaxSize:%dMB, to clear handle", remainSize, cacheMaxSize); ctgClearFreeCache(operation); taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); return; diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index dfa8a3b89a..3748615532 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -338,6 +338,7 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardN void* key = taosHashGetKey(pCache, &len); if (pCache->pMeta && TSDB_SUPER_TABLE == pCache->pMeta->tableType) { + pCache = taosHashIterate(dbCache->tbCache, pCache); continue; } @@ -346,10 +347,15 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardN atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize); *pClearedSize += cacheSize; (*pCleardNum)++; + + if (pCache->pMeta) { + CTG_META_NUM_DEC(pCache->pMeta->tableType); + } ctgFreeTbCacheImpl(pCache, true); - + if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) { + taosHashCancelIterate(dbCache->tbCache, pCache); goto _return; } @@ -366,9 +372,7 @@ _return: } } -void ctgClearAllHandleMeta(bool *roundDone) { - int64_t clearedSize = 0; - int64_t clearedNum = 0; +void ctgClearAllHandleMeta(int64_t *clearedSize, int64_t *clearedNum, bool *roundDone) { SCatalog *pCtg = NULL; void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); @@ -376,7 +380,7 @@ void ctgClearAllHandleMeta(bool *roundDone) { pCtg = *(SCatalog **)pIter; if (pCtg) { - ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, roundDone); + ctgClearHandleMeta(pCtg, clearedSize, clearedNum, roundDone); if (*roundDone) { taosHashCancelIterate(gCtgMgmt.pCluster, pIter); break; diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 2cba433e84..a4786a77dc 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -1746,6 +1746,8 @@ TEST(tableMeta, updateStbMeta) { code = catalogUpdateTableMeta(pCtg, &rsp); ASSERT_EQ(code, 0); + code = catalogAsyncUpdateTableMeta(pCtg, &rsp); + ASSERT_EQ(code, 0); taosMemoryFreeClear(rsp.pSchemas); while (true) {