From 79d440337798c0edef889ff94d030deeca34d5ea Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 10 May 2023 17:44:53 +0800 Subject: [PATCH] feat: add client meta limitation --- include/common/tglobal.h | 3 +- include/common/tmsg.h | 3 +- include/libs/qcom/query.h | 24 ++- source/client/src/clientEnv.c | 2 +- source/common/src/tglobal.c | 11 +- source/common/src/tmsg.c | 6 +- source/dnode/mnode/impl/src/mndSma.c | 1 + source/libs/catalog/inc/catalogInt.h | 64 ++++++-- source/libs/catalog/src/catalog.c | 52 +++++- source/libs/catalog/src/ctgCache.c | 156 ++++++++++++++---- source/libs/catalog/src/ctgDbg.c | 6 +- source/libs/catalog/src/ctgUtil.c | 236 +++++++++++++++++++++++++-- source/libs/command/src/command.c | 2 +- 13 files changed, 480 insertions(+), 86 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 6fde7b48a2..e701a93150 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -118,6 +118,7 @@ extern int32_t tsRedirectFactor; extern int32_t tsRedirectMaxPeriod; extern int32_t tsMaxRetryWaitTime; extern bool tsUseAdapter; +extern int32_t tsMetaCacheMaxSize; // client extern int32_t tsMinSlidingTime; @@ -183,7 +184,7 @@ struct SConfig *taosGetCfg(); void taosSetAllDebugFlag(int32_t flag, bool rewrite); void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal, bool rewrite); -int32_t taosSetCfg(SConfig *pCfg, char *name); +int32_t taosApplyLocalCfg(SConfig *pCfg, char *name); void taosLocalCfgForbiddenToChange(char *name, bool *forbidden); #ifdef __cplusplus diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f377ad0d63..e1206c029d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3123,7 +3123,8 @@ typedef struct { char dbFName[TSDB_DB_FNAME_LEN]; uint64_t suid; int32_t version; - SArray* pIndex; + int32_t indexSize; + SArray* pIndex; // STableIndexInfo } STableIndexRsp; int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index cfc6ef2025..7ce81d1190 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -90,28 +90,23 @@ typedef struct STbVerInfo { int32_t tversion; } STbVerInfo; -/* - * ASSERT(sizeof(SCTableMeta) == 24) - * ASSERT(tableType == TSDB_CHILD_TABLE) - * The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info. - */ +#pragma pack(push, 1) typedef struct SCTableMeta { - int32_t vgId : 24; - int8_t tableType; uint64_t uid; uint64_t suid; + int32_t vgId; + int8_t tableType; } SCTableMeta; +#pragma pack(pop) -/* - * Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a - * SCTableMeta. - */ + +#pragma pack(push, 1) typedef struct STableMeta { // BEGIN: KEEP THIS PART SAME WITH SCTableMeta - int32_t vgId : 24; - int8_t tableType; uint64_t uid; uint64_t suid; + int32_t vgId; + int8_t tableType; // END: KEEP THIS PART SAME WITH SCTableMeta // if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta @@ -121,6 +116,7 @@ typedef struct STableMeta { STableComInfo tableInfo; SSchema schema[]; } STableMeta; +#pragma pack(pop) typedef struct SDBVgInfo { int32_t vgVersion; @@ -130,7 +126,7 @@ typedef struct SDBVgInfo { int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT int64_t stateTs; SHashObj* vgHash; // key:vgId, value:SVgroupInfo - SArray* vgArray; + SArray* vgArray; // SVgroupInfo } SDBVgInfo; typedef struct SUseDbOutput { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 99569fdb57..60b8572e16 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -645,7 +645,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { } else { tscInfo("set cfg:%s to %s", pItem->name, str); if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) { - code = taosSetCfg(pCfg, pItem->name); + code = taosApplyLocalCfg(pCfg, pItem->name); } } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index da4a912238..2ddc714329 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -117,6 +117,7 @@ int32_t tsRedirectFactor = 2; int32_t tsRedirectMaxPeriod = 1000; int32_t tsMaxRetryWaitTime = 10000; bool tsUseAdapter = false; +int32_t tsMetaCacheMaxSize = -1; // MB /* @@ -345,6 +346,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, 1) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); @@ -742,6 +744,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64; + tsMetaCacheMaxSize = cfgGetItem(pCfg, "metaCacheMaxSize")->i32; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; @@ -864,7 +867,7 @@ void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) { *forbidden = false; } -int32_t taosSetCfg(SConfig *pCfg, char *name) { +int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { int32_t len = strlen(name); char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len)); @@ -999,6 +1002,12 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { } break; } + case 'e': { + if (strcasecmp("metaCacheMaxSize", name) == 0) { + atomic_store_32(&tsMetaCacheMaxSize, cfgGetItem(pCfg, "metaCacheMaxSize")->i32); + } + break; + } case 'i': { if (strcasecmp("minimalTmpDirGB", name) == 0) { tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4139d6c7d4..166f9f53a0 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3302,6 +3302,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1; if (tEncodeU64(&encoder, pRsp->suid) < 0) return -1; if (tEncodeI32(&encoder, pRsp->version) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->indexSize) < 0) return -1; int32_t num = taosArrayGetSize(pRsp->pIndex); if (tEncodeI32(&encoder, num) < 0) return -1; if (num > 0) { @@ -3347,6 +3348,7 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1; if (tDecodeU64(&decoder, &pRsp->suid) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->indexSize) < 0) return -1; int32_t num = 0; if (tDecodeI32(&decoder, &num) < 0) return -1; if (num > 0) { @@ -3621,6 +3623,7 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) { if (tEncodeCStr(&encoder, pIndexRsp->dbFName) < 0) return -1; if (tEncodeU64(&encoder, pIndexRsp->suid) < 0) return -1; if (tEncodeI32(&encoder, pIndexRsp->version) < 0) return -1; + if (tEncodeI32(&encoder, pIndexRsp->indexSize) < 0) return -1; int32_t num = taosArrayGetSize(pIndexRsp->pIndex); if (tEncodeI32(&encoder, num) < 0) return -1; for (int32_t i = 0; i < num; ++i) { @@ -3683,6 +3686,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) { if (tDecodeCStrTo(&decoder, tableIndexRsp.dbFName) < 0) return -1; if (tDecodeU64(&decoder, &tableIndexRsp.suid) < 0) return -1; if (tDecodeI32(&decoder, &tableIndexRsp.version) < 0) return -1; + if (tDecodeI32(&decoder, &tableIndexRsp.indexSize) < 0) return -1; int32_t num = 0; if (tDecodeI32(&decoder, &num) < 0) return -1; if (num > 0) { @@ -7691,4 +7695,4 @@ void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) { taosMemoryFreeClear(pSubTopicEp->schema.pSchema); pSubTopicEp->schema.nCols = 0; taosArrayDestroy(pSubTopicEp->vgs); -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 3d654a23d8..42ad9e24d5 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1114,6 +1114,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool return code; } + rsp->indexSize += sizeof(info) + pSma->exprLen + 1; *exist = true; sdbRelease(pSdb, pSma); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 13a7328887..9f892f23ff 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -23,6 +23,8 @@ extern "C" { #include "catalog.h" #include "query.h" #include "tcommon.h" +#include "ttimer.h" +#include "tglobal.h" #define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6 #define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 @@ -34,6 +36,8 @@ extern "C" { #define CTG_DEFAULT_BATCH_NUM 64 #define CTG_DEFAULT_FETCH_NUM 8 #define CTG_MAX_COMMAND_LEN 512 +#define CTG_DEFAULT_CACHE_MON_MSEC 5000 +#define CTG_CLEAR_CACHE_ROUND_TB_NUM 3000 #define CTG_RENT_SLOT_SECOND 1.5 @@ -227,8 +231,8 @@ typedef STableIndexRsp STableIndex; typedef struct SCtgTbCache { SRWLatch metaLock; - STableMeta* pMeta; SRWLatch indexLock; + STableMeta* pMeta; STableIndex* pIndex; } SCtgTbCache; @@ -245,6 +249,7 @@ typedef struct SCtgDBCache { SHashObj* tbCache; // key:tbname, value:SCtgTbCache SHashObj* stbCache; // key:suid, value:char* uint64_t dbCacheNum[CTG_CI_MAX_VALUE]; + uint64_t dbCacheSize; } SCtgDBCache; typedef struct SCtgRentSlot { @@ -258,12 +263,15 @@ typedef struct SCtgRentMgmt { uint16_t slotNum; uint16_t slotRIdx; int64_t lastReadMsec; + uint64_t rentCacheSize; + int32_t metaSize; SCtgRentSlot* slots; } SCtgRentMgmt; typedef struct SCtgUserAuth { SRWLatch lock; SGetUserAuthRsp userAuth; + uint64_t userCacheSize; } SCtgUserAuth; typedef struct SCatalog { @@ -392,6 +400,7 @@ typedef struct SCtgRuntimeStat { uint64_t numOfOpAbort; uint64_t numOfOpEnqueue; uint64_t numOfOpDequeue; + uint64_t numOfOpClearMeta; uint64_t numOfOpClearCache; } SCtgRuntimeStat; @@ -461,6 +470,7 @@ typedef struct SCtgDropTbIndexMsg { typedef struct SCtgClearCacheMsg { SCatalog* pCtg; + bool clearMeta; bool freeCtg; } SCtgClearCacheMsg; @@ -499,6 +509,8 @@ typedef struct SCatalogMgmt { int32_t jobPool; SRWLatch lock; SCtgQueue queue; + void *timer; + tmr_h cacheTimer; TdThread updateThread; SHashObj* pCluster; // key: clusterId, value: SCatalog* SCatalogStat statInfo; @@ -515,9 +527,8 @@ typedef struct SCtgOperation { } SCtgOperation; typedef struct SCtgCacheItemInfo { - char *name; - int32_t flag; - uint32_t persistSize; + char* name; + int32_t flag; } SCtgCacheItemInfo; #define CTG_AUTH_READ(_t) ((_t) == AUTH_TYPE_READ || (_t) == AUTH_TYPE_READ_OR_WRITE) @@ -530,11 +541,6 @@ typedef struct SCtgCacheItemInfo { #define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n) #define CTG_STAT_GET(_item) atomic_load_64(&(_item)) -#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1 -#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1 -#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1 -#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0 - #define CTG_STAT_API_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.api.item, n)) #define CTG_STAT_RT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.runtime.item, n)) #define CTG_STAT_NUM_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.cache.cacheNum[item], n)) @@ -549,6 +555,11 @@ typedef struct SCtgCacheItemInfo { #define CTG_CACHE_HIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheHit[item], n)) #define CTG_CACHE_NHIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheNHit[item], n)) +#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1 +#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1 +#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1 +#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0 + #define CTG_META_NUM_INC(type) \ do { \ switch (type) { \ @@ -658,6 +669,10 @@ typedef struct SCtgCacheItemInfo { #define CTG_DB_NOT_EXIST(code) \ (code == TSDB_CODE_MND_DB_NOT_EXIST || code == TSDB_CODE_MND_DB_IN_CREATING || code == TSDB_CODE_MND_DB_IN_DROPPING) +#define CTG_CACHE_OVERFLOW(_csize, _maxsize) ((_maxsize >= 0) ? ((_csize) >= (_maxsize) * 1048576L * 0.9) : false) +#define CTG_CACHE_LOW(_csize, _maxsize) ((_maxsize >= 0) ? ((_csize) <= (_maxsize) * 1048576L * 0.75) : true) + + #define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgWarn(param, ...) qWarn("CTG:%p " param, pCtg, __VA_ARGS__) @@ -760,6 +775,12 @@ typedef struct SCtgCacheItemInfo { CTG_RET(__code); \ } while (0) +#define CTG_API_NLEAVE() \ + do { \ + CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); \ + CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \ + } while (0) + #define CTG_API_ENTER() \ do { \ CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \ @@ -769,6 +790,15 @@ typedef struct SCtgCacheItemInfo { } \ } while (0) +#define CTG_API_NENTER() \ + do { \ + CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \ + CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \ + if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \ + CTG_API_NLEAVE(); \ + } \ + } while (0) + #define CTG_API_JENTER() \ do { \ CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \ @@ -829,8 +859,8 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput* output, bool sy int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp* pAuth, bool syncReq); int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char* dbFName, int32_t vgId, SEpSet* pEpSet); int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex** pIndex, bool syncOp); -int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp); -int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type); +int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp); +int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type, int32_t size); int32_t ctgMetaRentAdd(SCtgRentMgmt* mgmt, void* meta, int64_t id, int32_t size); int32_t ctgMetaRentGet(SCtgRentMgmt* mgmt, void** res, uint32_t* num, int32_t size); int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq); @@ -911,7 +941,7 @@ void ctgFreeSTableIndex(void* info); void ctgClearSubTaskRes(SCtgSubRes* pRes); void ctgFreeQNode(SCtgQNode* node); void ctgClearHandle(SCatalog* pCtg); -void ctgFreeTbCacheImpl(SCtgTbCache* pCache); +void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock); int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName); int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists); @@ -927,6 +957,16 @@ void ctgReleaseVgMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCach void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCache* pCache); void ctgGetGlobalCacheStat(SCtgCacheStat* pStat); int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res); +void ctgGetGlobalCacheSize(uint64_t *pSize); +uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex); +uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta); +uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg); +uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth); +uint64_t ctgGetClusterCacheSize(SCatalog *pCtg); +void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone); +void ctgClearAllHandleMeta(bool *roundDone); +void ctgProcessTimerEvent(void *param, void *tmrId); + extern SCatalogMgmt gCtgMgmt; extern SCtgDebug gCTGDebug; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index bddc6c01a7..2dbe3c31ba 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -626,6 +626,36 @@ _return: CTG_RET(code); } +void ctgProcessTimerEvent(void *param, void *tmrId) { + CTG_API_NENTER(); + + int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize); + if (cacheMaxSize >= 0) { + uint64_t cacheSize = 0; + ctgGetGlobalCacheSize(&cacheSize); + bool overflow = CTG_CACHE_OVERFLOW(cacheSize, cacheMaxSize); + + qDebug("catalog cache size: %" PRIu64"B, maxCaseSize:%dMB, %s", cacheSize, cacheMaxSize, overflow ? "overflow" : "NO overflow"); + + if (overflow) { + int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false); + if (code) { + qError("clear cache enqueue failed, error:%s", tstrerror(code)); + taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + } + + goto _return; + } + } + + taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + +_return: + + CTG_API_NLEAVE(); +} + + int32_t catalogInit(SCatalogCfg* cfg) { if (gCtgMgmt.pCluster) { qError("catalog already initialized"); @@ -684,6 +714,18 @@ int32_t catalogInit(SCatalogCfg* cfg) { CTG_ERR_RET(terrno); } + gCtgMgmt.timer = taosTmrInit(0, 0, 0, "catalog"); + if (NULL == gCtgMgmt.timer) { + qError("init timer failed, error:%s", tstrerror(terrno)); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + gCtgMgmt.cacheTimer = taosTmrStart(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer); + if (NULL == gCtgMgmt.cacheTimer) { + qError("start cache timer failed"); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + CTG_ERR_RET(ctgStartUpdateThread()); qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, @@ -727,8 +769,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { clusterCtg->clusterId = clusterId; - CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB)); - CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE)); + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbVgVersion))); + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion))); clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -1511,11 +1553,11 @@ int32_t catalogClearCache(void) { qInfo("start to clear catalog cache"); - if (NULL == gCtgMgmt.pCluster || atomic_load_8((int8_t*)&gCtgMgmt.exit)) { + if (NULL == gCtgMgmt.pCluster) { CTG_API_LEAVE_NOLOCK(TSDB_CODE_SUCCESS); } - int32_t code = ctgClearCacheEnqueue(NULL, false, false, true); + int32_t code = ctgClearCacheEnqueue(NULL, false, false, false, true); qInfo("clear catalog cache end, code: %s", tstrerror(code)); @@ -1532,7 +1574,7 @@ void catalogDestroy(void) { atomic_store_8((int8_t*)&gCtgMgmt.exit, true); if (!taosCheckCurrentInDll()) { - ctgClearCacheEnqueue(NULL, true, true, true); + ctgClearCacheEnqueue(NULL, false, true, true, true); taosThreadJoin(gCtgMgmt.updateThread, NULL); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 92a5956fc4..c697c81325 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -32,14 +32,14 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}}; SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = { - {"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL, sizeof(SCatalog)}, //CTG_CI_CLUSTER - {"Dnode ", CTG_CI_FLAG_LEVEL_CLUSTER, 0}, //CTG_CI_DNODE, - {"Qnode ", CTG_CI_FLAG_LEVEL_CLUSTER, 0}, //CTG_CI_QNODE, - {"DB ", CTG_CI_FLAG_LEVEL_CLUSTER, sizeof(SCtgDBCache)}, //CTG_CI_DB, - {"DbVgroup ", CTG_CI_FLAG_LEVEL_DB, sizeof(SDBVgInfo)}, //CTG_CI_DB_VGROUP, - {"DbCfg ", CTG_CI_FLAG_LEVEL_DB, 0}, //CTG_CI_DB_CFG, - {"DbInfo ", CTG_CI_FLAG_LEVEL_DB, 0}, //CTG_CI_DB_INFO, - {"StbMeta ", CTG_CI_FLAG_LEVEL_DB, }, //CTG_CI_STABLE_META, + {"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL}, //CTG_CI_CLUSTER + {"Dnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DNODE, + {"Qnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_QNODE, + {"DB ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DB, + {"DbVgroup ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_VGROUP, + {"DbCfg ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_CFG, + {"DbInfo ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_INFO, + {"StbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_STABLE_META, {"NtbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_NTABLE_META, {"CtbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_CTABLE_META, {"SysTblMeta", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_SYSTABLE_META, @@ -1135,7 +1135,7 @@ _return: CTG_RET(code); } -int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) { +int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp) { int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_CLEAR_CACHE; @@ -1151,6 +1151,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool } msg->pCtg = pCtg; + msg->clearMeta = clearMeta; msg->freeCtg = freeCtg; op->data = msg; @@ -1163,10 +1164,11 @@ _return: CTG_RET(code); } -int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { +int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type, int32_t size) { mgmt->slotRIdx = 0; mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND; mgmt->type = type; + mgmt->metaSize = size; size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum; @@ -1176,6 +1178,8 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + mgmt->rentCacheSize = msgSize; + qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum); return TSDB_CODE_SUCCESS; @@ -1202,6 +1206,7 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + mgmt->rentCacheSize += size; slot->needSort = true; qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); @@ -1282,6 +1287,7 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp } taosArrayRemove(slot->meta, idx); + mgmt->rentCacheSize -= mgmt->metaSize; qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); @@ -1549,10 +1555,15 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam } if (origType == TSDB_SUPER_TABLE) { - if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) { - ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid); - } else { - ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid); + char *stbName = taosHashGet(dbCache->stbCache, &orig->suid, sizeof(orig->suid)); + if (stbName) { + uint64_t metaSize = strlen(stbName) + 1 + sizeof(orig->suid); + if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) { + ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid); + } else { + ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid); + atomic_sub_fetch_64(&dbCache->dbCacheSize, metaSize); + } } } } @@ -1566,14 +1577,20 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(meta)); + pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName)); } else { CTG_LOCK(CTG_WRITE, &pCache->metaLock); if (orig) { CTG_META_NUM_DEC(origType); } + + atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(meta) - ctgGetTbMetaCacheSize(pCache->pMeta)); + taosMemoryFree(pCache->pMeta); pCache->pMeta = meta; + CTG_UNLOCK(CTG_WRITE, &pCache->metaLock); } @@ -1591,6 +1608,8 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + atomic_add_fetch_64(&dbCache->dbCacheSize, sizeof(meta->suid) + strlen(tbName) + 1); + ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, meta->tableType); @@ -1623,6 +1642,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(SCtgTbCache) + ctgGetTbIndexCacheSize(pIndex)); + CTG_DB_NUM_INC(CTG_CI_TBL_SMA); *index = NULL; @@ -1639,6 +1660,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa CTG_LOCK(CTG_WRITE, &pCache->indexLock); if (pCache->pIndex) { + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pCache->pIndex)); if (0 == suid) { suid = pCache->pIndex->suid; } @@ -1649,6 +1671,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa pCache->pIndex = pIndex; CTG_UNLOCK(CTG_WRITE, &pCache->indexLock); + atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pIndex)); + *index = NULL; ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, @@ -1678,7 +1702,7 @@ _return: CTG_RET(code); } -void ctgClearAllInstance(void) { +void ctgClearAllHandles(void) { SCatalog *pCtg = NULL; void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); @@ -1693,7 +1717,7 @@ void ctgClearAllInstance(void) { } } -void ctgFreeAllInstance(void) { +void ctgFreeAllHandles(void) { SCatalog *pCtg = NULL; void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); @@ -1752,6 +1776,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { } SCtgVgCache *vgCache = &dbCache->vgCache; + CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache)); if (vgCache->vgInfo) { @@ -1774,6 +1799,8 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { goto _return; } + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(vgCache->vgInfo)); + freeVgInfo(vgInfo); CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP); } @@ -1787,6 +1814,8 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { ctgWUnlockVgInfo(dbCache); + atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(vgCache->vgInfo)); + dbCache = NULL; // if (!IS_SYS_DBNAME(dbFName)) { @@ -1850,6 +1879,8 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) { CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache)); + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(dbCache->vgCache.vgInfo)); + freeVgInfo(dbCache->vgCache.vgInfo); dbCache->vgCache.vgInfo = NULL; @@ -1941,26 +1972,32 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { goto _return; } - if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) { - ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, - msg->stbName, msg->suid); + char *stbName = taosHashGet(dbCache->stbCache, &msg->suid, sizeof(msg->suid)); + if (stbName) { + uint64_t metaSize = strlen(stbName) + 1 + sizeof(msg->suid); + if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) { + ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, + msg->stbName, msg->suid); + } else { + atomic_sub_fetch_64(&dbCache->dbCacheSize, metaSize); + } } - + SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName)); if (NULL == pTbCache) { ctgDebug("stb %s already not in cache", msg->stbName); goto _return; } - CTG_LOCK(CTG_WRITE, &pTbCache->metaLock); tblType = pTbCache->pMeta->tableType; - ctgFreeTbCacheImpl(pTbCache); - CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(pTbCache->pMeta) + ctgGetTbIndexCacheSize(pTbCache->pIndex)); + ctgFreeTbCacheImpl(pTbCache, true); if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) { ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid); } else { CTG_META_NUM_DEC(tblType); + atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(*pTbCache) + strlen(msg->stbName)); } ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid); @@ -2004,15 +2041,15 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { goto _return; } - CTG_LOCK(CTG_WRITE, &pTbCache->metaLock); tblType = pTbCache->pMeta->tableType; - ctgFreeTbCacheImpl(pTbCache); - CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(pTbCache->pMeta) + ctgGetTbIndexCacheSize(pTbCache->pIndex)); + ctgFreeTbCacheImpl(pTbCache, true); if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) { ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } else { + atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(*pTbCache) + strlen(msg->tbName)); CTG_META_NUM_DEC(tblType); } @@ -2039,7 +2076,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) { SCtgUserAuth userAuth = {0}; memcpy(&userAuth.userAuth, &msg->userAuth, sizeof(msg->userAuth)); - + userAuth.userCacheSize = ctgGetUserCacheSize(&userAuth.userAuth); + if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) { ctgError("taosHashPut user %s to cache failed", msg->userAuth.user); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); @@ -2079,6 +2117,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) { msg->userAuth.useDbs = NULL; CTG_UNLOCK(CTG_WRITE, &pUser->lock); + + atomic_store_64(&pUser->userCacheSize, ctgGetUserCacheSize(&pUser->userAuth)); _return: @@ -2214,33 +2254,77 @@ _return: CTG_RET(code); } -int32_t ctgOpClearCache(SCtgCacheOperation *operation) { - int32_t code = 0; +void ctgClearFreeCache(SCtgCacheOperation *operation) { SCtgClearCacheMsg *msg = operation->data; SCatalog *pCtg = msg->pCtg; CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); - + if (pCtg) { if (msg->freeCtg) { ctgFreeHandle(pCtg); } else { ctgClearHandle(pCtg); } + } else if (msg->freeCtg) { + ctgFreeAllHandles(); + } else { + ctgClearAllHandles(); + } + + CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock); +} - goto _return; +void ctgClearMetaCache(SCtgCacheOperation *operation) { + SCtgClearCacheMsg *msg = operation->data; + SCatalog *pCtg = msg->pCtg; + int64_t clearedSize = 0; + int64_t clearedNum = 0; + int64_t remainSize = 0; + bool roundDone = false; + + if (pCtg) { + ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, &roundDone); + } else { + ctgClearAllHandleMeta(&roundDone); } - if (msg->freeCtg) { - ctgFreeAllInstance(); + qDebug("catalog finish one round meta clear, done:%d", roundDone); + + ctgGetGlobalCacheSize(&remainSize); + int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize); + + if (!CTG_CACHE_LOW(remainSize, cacheMaxSize)) { + qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize); + taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + return; + } + + if (!roundDone) { + ctgClearFreeCache(operation); + taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + return; + } + + int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false); + if (code) { + qError("clear cache enqueue failed, error:%s", tstrerror(code)); + taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + } +} + +int32_t ctgOpClearCache(SCtgCacheOperation *operation) { + int32_t code = 0; + SCtgClearCacheMsg *msg = operation->data; + + if (msg->clearMeta) { + ctgClearMetaCache(operation); } else { - ctgClearAllInstance(); + ctgClearFreeCache(operation); } _return: - CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock); - taosMemoryFreeClear(msg); CTG_RET(code); diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index 12ff8a7b38..f4aeca5876 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -19,7 +19,7 @@ #include "trpc.h" extern SCatalogMgmt gCtgMgmt; -SCtgDebug gCTGDebug = {0}; +SCtgDebug gCTGDebug = {.statEnable = true}; #if 0 @@ -528,7 +528,10 @@ int32_t ctgdShowStatInfo(void) { CTG_API_ENTER(); SCtgCacheStat cache; + uint64_t cacheSize = 0; + ctgGetGlobalCacheStat(&cache); + ctgGetGlobalCacheSize(&cacheSize); qDebug("## Global Stat Info %s ##", "begin"); qDebug("## \t%s \t%s \t%s ##", "Num", "Hit", "Nhit"); @@ -536,6 +539,7 @@ int32_t ctgdShowStatInfo(void) { qDebug("# %s \t%" PRIu64 " \t%" PRIu64 " \t%" PRIu64 " #", gCtgStatItem[i].name, cache.cacheNum[i], cache.cacheHit[i], cache.cacheNHit[i]); } qDebug("## Global Stat Info %s ##", "end"); + qDebug("## Global Cache Size: %" PRIu64, cacheSize); CTG_API_LEAVE(TSDB_CODE_SUCCESS); } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index bb45b9bd95..dfa8a3b89a 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -196,6 +196,7 @@ void ctgFreeMetaRent(SCtgRentMgmt* mgmt) { } taosMemoryFreeClear(mgmt->slots); + mgmt->rentCacheSize = 0; } void ctgFreeStbMetaCache(SCtgDBCache* dbCache) { @@ -208,12 +209,26 @@ void ctgFreeStbMetaCache(SCtgDBCache* dbCache) { dbCache->stbCache = NULL; } -void ctgFreeTbCacheImpl(SCtgTbCache* pCache) { - qDebug("tbMeta freed, p:%p", pCache->pMeta); - taosMemoryFreeClear(pCache->pMeta); +void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock) { + if (pCache->pMeta) { + if (lock) { + CTG_LOCK(CTG_WRITE, &pCache->metaLock); + } + taosMemoryFreeClear(pCache->pMeta); + if (lock) { + CTG_UNLOCK(CTG_WRITE, &pCache->metaLock); + } + } + if (pCache->pIndex) { + if (lock) { + CTG_LOCK(CTG_WRITE, &pCache->indexLock); + } taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo); taosMemoryFreeClear(pCache->pIndex); + if (lock) { + CTG_UNLOCK(CTG_WRITE, &pCache->indexLock); + } } } @@ -225,7 +240,7 @@ void ctgFreeTbCache(SCtgDBCache* dbCache) { int32_t tblNum = taosHashGetSize(dbCache->tbCache); SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL); while (NULL != pCache) { - ctgFreeTbCacheImpl(pCache); + ctgFreeTbCacheImpl(pCache, false); pCache = taosHashIterate(dbCache->tbCache, pCache); } taosHashCleanup(dbCache->tbCache); @@ -311,21 +326,82 @@ void ctgFreeHandle(SCatalog* pCtg) { ctgInfo("handle freed, clusterId:0x%" PRIx64, clusterId); } +void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone) { + int64_t cacheSize = 0; + void* pIter = taosHashIterate(pCtg->dbCache, NULL); + while (pIter) { + SCtgDBCache* dbCache = pIter; + + SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL); + while (NULL != pCache) { + size_t len = 0; + void* key = taosHashGetKey(pCache, &len); + + if (pCache->pMeta && TSDB_SUPER_TABLE == pCache->pMeta->tableType) { + continue; + } + + taosHashRemove(dbCache->tbCache, key, len); + cacheSize = len + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(pCache->pMeta) + ctgGetTbIndexCacheSize(pCache->pIndex); + atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize); + *pClearedSize += cacheSize; + (*pCleardNum)++; + + ctgFreeTbCacheImpl(pCache, true); + + if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) { + goto _return; + } + + pCache = taosHashIterate(dbCache->tbCache, pCache); + } + + pIter = taosHashIterate(pCtg->dbCache, pIter); + } + +_return: + + if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) { + *roundDone = true; + } +} + +void ctgClearAllHandleMeta(bool *roundDone) { + int64_t clearedSize = 0; + int64_t clearedNum = 0; + SCatalog *pCtg = NULL; + + void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); + while (pIter) { + pCtg = *(SCatalog **)pIter; + + if (pCtg) { + ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, roundDone); + if (*roundDone) { + taosHashCancelIterate(gCtgMgmt.pCluster, pIter); + break; + } + } + + pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); + } +} + void ctgClearHandle(SCatalog* pCtg) { if (NULL == pCtg) { return; } uint64_t clusterId = pCtg->clusterId; - + ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); ctgFreeInstDbCache(pCtg->dbCache); ctgFreeInstUserCache(pCtg->userCache); - ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB); - ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE); + ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbVgVersion)); + ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion)); pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -1562,6 +1638,130 @@ void catalogFreeMetaData(SMetaData* pData) { } #endif +uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex) { + if (NULL == pIndex) { + return 0; + } + + return sizeof(*pIndex) + pIndex->indexSize; +} + +FORCE_INLINE uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta) { + if (NULL == pMeta) { + return 0; + } + + switch (pMeta->tableType) { + case TSDB_SUPER_TABLE: + return sizeof(*pMeta) + (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags) * sizeof(SSchema); + case TSDB_CHILD_TABLE: + return sizeof(SCTableMeta); + default: + return sizeof(*pMeta) + pMeta->tableInfo.numOfColumns * sizeof(SSchema); + } + + return 0; +} + +uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg) { + if (NULL == pVg) { + return 0; + } + + return sizeof(*pVg) + taosHashGetSize(pVg->vgHash) * (sizeof(SVgroupInfo) + sizeof(int32_t)) + + taosArrayGetSize(pVg->vgArray) * sizeof(SVgroupInfo); +} + +uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth) { + if (NULL == pAuth) { + return 0; + } + + uint64_t cacheSize = 0; + char* p = taosHashIterate(pAuth->createdDbs, NULL); + while (p != NULL) { + size_t len = 0; + void* key = taosHashGetKey(p, &len); + cacheSize += len + strlen(p); + + p = taosHashIterate(pAuth->createdDbs, p); + } + + p = taosHashIterate(pAuth->readDbs, NULL); + while (p != NULL) { + size_t len = 0; + void* key = taosHashGetKey(p, &len); + cacheSize += len + strlen(p); + + p = taosHashIterate(pAuth->readDbs, p); + } + + p = taosHashIterate(pAuth->writeDbs, NULL); + while (p != NULL) { + size_t len = 0; + void* key = taosHashGetKey(p, &len); + cacheSize += len + strlen(p); + + p = taosHashIterate(pAuth->writeDbs, p); + } + + p = taosHashIterate(pAuth->readTbs, NULL); + while (p != NULL) { + size_t len = 0; + void* key = taosHashGetKey(p, &len); + cacheSize += len + strlen(p); + + p = taosHashIterate(pAuth->readTbs, p); + } + + p = taosHashIterate(pAuth->writeTbs, NULL); + while (p != NULL) { + size_t len = 0; + void* key = taosHashGetKey(p, &len); + cacheSize += len + strlen(p); + + p = taosHashIterate(pAuth->writeTbs, p); + } + + int32_t *ref = taosHashIterate(pAuth->useDbs, NULL); + while (ref != NULL) { + size_t len = 0; + void* key = taosHashGetKey(ref, &len); + cacheSize += len + sizeof(*ref); + + ref = taosHashIterate(pAuth->useDbs, ref); + } + + return cacheSize; +} + +uint64_t ctgGetClusterCacheSize(SCatalog *pCtg) { + uint64_t cacheSize = sizeof(SCatalog); + + SCtgUserAuth* pAuth = taosHashIterate(pCtg->userCache, NULL); + while (pAuth != NULL) { + size_t len = 0; + void* key = taosHashGetKey(pAuth, &len); + cacheSize += len + sizeof(SCtgUserAuth) + atomic_load_64(&pAuth->userCacheSize); + + pAuth = taosHashIterate(pCtg->userCache, pAuth); + } + + SCtgDBCache* pDb = taosHashIterate(pCtg->dbCache, NULL); + while (pDb != NULL) { + size_t len = 0; + void* key = taosHashGetKey(pDb, &len); + cacheSize += len + sizeof(SCtgDBCache) + atomic_load_64(&pDb->dbCacheSize); + + pDb = taosHashIterate(pCtg->dbCache, pDb); + } + + cacheSize += pCtg->dbRent.rentCacheSize; + cacheSize += pCtg->stbRent.rentCacheSize; + + return cacheSize; +} + void ctgGetClusterCacheStat(SCatalog* pCtg) { for (int32_t i = 0; i < CTG_CI_MAX_VALUE; ++i) { if (0 == (gCtgStatItem[i].flag & CTG_CI_FLAG_LEVEL_DB)) { @@ -1627,10 +1827,22 @@ void ctgGetGlobalCacheStat(SCtgCacheStat* pStat) { memcpy(pStat, &gCtgMgmt.statInfo.cache, sizeof(gCtgMgmt.statInfo.cache)); } -void ctgGetGlobalCacheUsedSize(uint64_t *pSize) { - SCtgCacheStat stat; - ctgGetGlobalCacheStat(&stat); - do; - +void ctgGetGlobalCacheSize(uint64_t *pSize) { + *pSize = 0; + + SCatalog* pCtg = NULL; + void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); + while (pIter) { + size_t len = 0; + void* key = taosHashGetKey(pIter, &len); + *pSize += len + POINTER_BYTES; + + pCtg = *(SCatalog**)pIter; + if (pCtg) { + *pSize += ctgGetClusterCacheSize(pCtg); + } + + pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); + } } diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 58c43829cf..185d1482bb 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -743,7 +743,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { return terrno; } - if (taosSetCfg(tsCfg, pStmt->config)) { + if (taosApplyLocalCfg(tsCfg, pStmt->config)) { return terrno; }