From 1c14de11d935ee8792fb94d498ee65d434136fd2 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 25 Apr 2023 10:04:33 +0800 Subject: [PATCH 01/10] enh: get meta cache size --- source/libs/catalog/inc/catalogInt.h | 6 ++++-- source/libs/catalog/src/ctgCache.c | 16 ++++++++-------- source/libs/catalog/src/ctgUtil.c | 7 +++++++ 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index c4e1fd3078..5d76ab6f25 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -128,6 +128,7 @@ typedef struct SCtgDebug { typedef struct SCtgCacheStat { uint64_t cacheNum[CTG_CI_MAX_VALUE]; + uint64_t cacheSize[CTG_CI_MAX_VALUE]; uint64_t cacheHit[CTG_CI_MAX_VALUE]; uint64_t cacheNHit[CTG_CI_MAX_VALUE]; } SCtgCacheStat; @@ -505,8 +506,9 @@ typedef struct SCtgOperation { } SCtgOperation; typedef struct SCtgCacheItemInfo { - char *name; - int32_t flag; + char *name; + int32_t flag; + uint32_t persistSize; } SCtgCacheItemInfo; #define CTG_AUTH_READ(_t) ((_t) == AUTH_TYPE_READ || (_t) == AUTH_TYPE_READ_OR_WRITE) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index b032158865..a13a58bec8 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -32,14 +32,14 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}}; SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = { - {"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL}, //CTG_CI_CLUSTER - {"Dnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DNODE, - {"Qnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_QNODE, - {"DB ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DB, - {"DbVgroup ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_VGROUP, - {"DbCfg ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_CFG, - {"DbInfo ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_INFO, - {"StbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_STABLE_META, + {"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL, sizeof(SCatalog)}, //CTG_CI_CLUSTER + {"Dnode ", CTG_CI_FLAG_LEVEL_CLUSTER, 0}, //CTG_CI_DNODE, + {"Qnode ", CTG_CI_FLAG_LEVEL_CLUSTER, 0}, //CTG_CI_QNODE, + {"DB ", CTG_CI_FLAG_LEVEL_CLUSTER, sizeof(SCtgDBCache)}, //CTG_CI_DB, + {"DbVgroup ", CTG_CI_FLAG_LEVEL_DB, sizeof(SDBVgInfo)}, //CTG_CI_DB_VGROUP, + {"DbCfg ", CTG_CI_FLAG_LEVEL_DB, 0}, //CTG_CI_DB_CFG, + {"DbInfo ", CTG_CI_FLAG_LEVEL_DB, 0}, //CTG_CI_DB_INFO, + {"StbMeta ", CTG_CI_FLAG_LEVEL_DB, }, //CTG_CI_STABLE_META, {"NtbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_NTABLE_META, {"CtbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_CTABLE_META, {"SysTblMeta", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_SYSTABLE_META, diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index a2f2a3d556..57645e8b87 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -1483,3 +1483,10 @@ void ctgGetGlobalCacheStat(SCtgCacheStat *pStat) { memcpy(pStat, &gCtgMgmt.statInfo.cache, sizeof(gCtgMgmt.statInfo.cache)); } +void ctgGetGlobalCacheUsedSize(uint64_t *pSize) { + SCtgCacheStat stat; + ctgGetGlobalCacheStat(&stat); + do; + +} + From 79d440337798c0edef889ff94d030deeca34d5ea Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 10 May 2023 17:44:53 +0800 Subject: [PATCH 02/10] feat: add client meta limitation --- include/common/tglobal.h | 3 +- include/common/tmsg.h | 3 +- include/libs/qcom/query.h | 24 ++- source/client/src/clientEnv.c | 2 +- source/common/src/tglobal.c | 11 +- source/common/src/tmsg.c | 6 +- source/dnode/mnode/impl/src/mndSma.c | 1 + source/libs/catalog/inc/catalogInt.h | 64 ++++++-- source/libs/catalog/src/catalog.c | 52 +++++- source/libs/catalog/src/ctgCache.c | 156 ++++++++++++++---- source/libs/catalog/src/ctgDbg.c | 6 +- source/libs/catalog/src/ctgUtil.c | 236 +++++++++++++++++++++++++-- source/libs/command/src/command.c | 2 +- 13 files changed, 480 insertions(+), 86 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 6fde7b48a2..e701a93150 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -118,6 +118,7 @@ extern int32_t tsRedirectFactor; extern int32_t tsRedirectMaxPeriod; extern int32_t tsMaxRetryWaitTime; extern bool tsUseAdapter; +extern int32_t tsMetaCacheMaxSize; // client extern int32_t tsMinSlidingTime; @@ -183,7 +184,7 @@ struct SConfig *taosGetCfg(); void taosSetAllDebugFlag(int32_t flag, bool rewrite); void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal, bool rewrite); -int32_t taosSetCfg(SConfig *pCfg, char *name); +int32_t taosApplyLocalCfg(SConfig *pCfg, char *name); void taosLocalCfgForbiddenToChange(char *name, bool *forbidden); #ifdef __cplusplus diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f377ad0d63..e1206c029d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3123,7 +3123,8 @@ typedef struct { char dbFName[TSDB_DB_FNAME_LEN]; uint64_t suid; int32_t version; - SArray* pIndex; + int32_t indexSize; + SArray* pIndex; // STableIndexInfo } STableIndexRsp; int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index cfc6ef2025..7ce81d1190 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -90,28 +90,23 @@ typedef struct STbVerInfo { int32_t tversion; } STbVerInfo; -/* - * ASSERT(sizeof(SCTableMeta) == 24) - * ASSERT(tableType == TSDB_CHILD_TABLE) - * The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info. - */ +#pragma pack(push, 1) typedef struct SCTableMeta { - int32_t vgId : 24; - int8_t tableType; uint64_t uid; uint64_t suid; + int32_t vgId; + int8_t tableType; } SCTableMeta; +#pragma pack(pop) -/* - * Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a - * SCTableMeta. - */ + +#pragma pack(push, 1) typedef struct STableMeta { // BEGIN: KEEP THIS PART SAME WITH SCTableMeta - int32_t vgId : 24; - int8_t tableType; uint64_t uid; uint64_t suid; + int32_t vgId; + int8_t tableType; // END: KEEP THIS PART SAME WITH SCTableMeta // if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta @@ -121,6 +116,7 @@ typedef struct STableMeta { STableComInfo tableInfo; SSchema schema[]; } STableMeta; +#pragma pack(pop) typedef struct SDBVgInfo { int32_t vgVersion; @@ -130,7 +126,7 @@ typedef struct SDBVgInfo { int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT int64_t stateTs; SHashObj* vgHash; // key:vgId, value:SVgroupInfo - SArray* vgArray; + SArray* vgArray; // SVgroupInfo } SDBVgInfo; typedef struct SUseDbOutput { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 99569fdb57..60b8572e16 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -645,7 +645,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { } else { tscInfo("set cfg:%s to %s", pItem->name, str); if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) { - code = taosSetCfg(pCfg, pItem->name); + code = taosApplyLocalCfg(pCfg, pItem->name); } } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index da4a912238..2ddc714329 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -117,6 +117,7 @@ int32_t tsRedirectFactor = 2; int32_t tsRedirectMaxPeriod = 1000; int32_t tsMaxRetryWaitTime = 10000; bool tsUseAdapter = false; +int32_t tsMetaCacheMaxSize = -1; // MB /* @@ -345,6 +346,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, 1) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); @@ -742,6 +744,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64; + tsMetaCacheMaxSize = cfgGetItem(pCfg, "metaCacheMaxSize")->i32; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; @@ -864,7 +867,7 @@ void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) { *forbidden = false; } -int32_t taosSetCfg(SConfig *pCfg, char *name) { +int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { int32_t len = strlen(name); char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len)); @@ -999,6 +1002,12 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { } break; } + case 'e': { + if (strcasecmp("metaCacheMaxSize", name) == 0) { + atomic_store_32(&tsMetaCacheMaxSize, cfgGetItem(pCfg, "metaCacheMaxSize")->i32); + } + break; + } case 'i': { if (strcasecmp("minimalTmpDirGB", name) == 0) { tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4139d6c7d4..166f9f53a0 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3302,6 +3302,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1; if (tEncodeU64(&encoder, pRsp->suid) < 0) return -1; if (tEncodeI32(&encoder, pRsp->version) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->indexSize) < 0) return -1; int32_t num = taosArrayGetSize(pRsp->pIndex); if (tEncodeI32(&encoder, num) < 0) return -1; if (num > 0) { @@ -3347,6 +3348,7 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1; if (tDecodeU64(&decoder, &pRsp->suid) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->indexSize) < 0) return -1; int32_t num = 0; if (tDecodeI32(&decoder, &num) < 0) return -1; if (num > 0) { @@ -3621,6 +3623,7 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) { if (tEncodeCStr(&encoder, pIndexRsp->dbFName) < 0) return -1; if (tEncodeU64(&encoder, pIndexRsp->suid) < 0) return -1; if (tEncodeI32(&encoder, pIndexRsp->version) < 0) return -1; + if (tEncodeI32(&encoder, pIndexRsp->indexSize) < 0) return -1; int32_t num = taosArrayGetSize(pIndexRsp->pIndex); if (tEncodeI32(&encoder, num) < 0) return -1; for (int32_t i = 0; i < num; ++i) { @@ -3683,6 +3686,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) { if (tDecodeCStrTo(&decoder, tableIndexRsp.dbFName) < 0) return -1; if (tDecodeU64(&decoder, &tableIndexRsp.suid) < 0) return -1; if (tDecodeI32(&decoder, &tableIndexRsp.version) < 0) return -1; + if (tDecodeI32(&decoder, &tableIndexRsp.indexSize) < 0) return -1; int32_t num = 0; if (tDecodeI32(&decoder, &num) < 0) return -1; if (num > 0) { @@ -7691,4 +7695,4 @@ void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) { taosMemoryFreeClear(pSubTopicEp->schema.pSchema); pSubTopicEp->schema.nCols = 0; taosArrayDestroy(pSubTopicEp->vgs); -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 3d654a23d8..42ad9e24d5 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1114,6 +1114,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool return code; } + rsp->indexSize += sizeof(info) + pSma->exprLen + 1; *exist = true; sdbRelease(pSdb, pSma); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 13a7328887..9f892f23ff 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -23,6 +23,8 @@ extern "C" { #include "catalog.h" #include "query.h" #include "tcommon.h" +#include "ttimer.h" +#include "tglobal.h" #define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6 #define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 @@ -34,6 +36,8 @@ extern "C" { #define CTG_DEFAULT_BATCH_NUM 64 #define CTG_DEFAULT_FETCH_NUM 8 #define CTG_MAX_COMMAND_LEN 512 +#define CTG_DEFAULT_CACHE_MON_MSEC 5000 +#define CTG_CLEAR_CACHE_ROUND_TB_NUM 3000 #define CTG_RENT_SLOT_SECOND 1.5 @@ -227,8 +231,8 @@ typedef STableIndexRsp STableIndex; typedef struct SCtgTbCache { SRWLatch metaLock; - STableMeta* pMeta; SRWLatch indexLock; + STableMeta* pMeta; STableIndex* pIndex; } SCtgTbCache; @@ -245,6 +249,7 @@ typedef struct SCtgDBCache { SHashObj* tbCache; // key:tbname, value:SCtgTbCache SHashObj* stbCache; // key:suid, value:char* uint64_t dbCacheNum[CTG_CI_MAX_VALUE]; + uint64_t dbCacheSize; } SCtgDBCache; typedef struct SCtgRentSlot { @@ -258,12 +263,15 @@ typedef struct SCtgRentMgmt { uint16_t slotNum; uint16_t slotRIdx; int64_t lastReadMsec; + uint64_t rentCacheSize; + int32_t metaSize; SCtgRentSlot* slots; } SCtgRentMgmt; typedef struct SCtgUserAuth { SRWLatch lock; SGetUserAuthRsp userAuth; + uint64_t userCacheSize; } SCtgUserAuth; typedef struct SCatalog { @@ -392,6 +400,7 @@ typedef struct SCtgRuntimeStat { uint64_t numOfOpAbort; uint64_t numOfOpEnqueue; uint64_t numOfOpDequeue; + uint64_t numOfOpClearMeta; uint64_t numOfOpClearCache; } SCtgRuntimeStat; @@ -461,6 +470,7 @@ typedef struct SCtgDropTbIndexMsg { typedef struct SCtgClearCacheMsg { SCatalog* pCtg; + bool clearMeta; bool freeCtg; } SCtgClearCacheMsg; @@ -499,6 +509,8 @@ typedef struct SCatalogMgmt { int32_t jobPool; SRWLatch lock; SCtgQueue queue; + void *timer; + tmr_h cacheTimer; TdThread updateThread; SHashObj* pCluster; // key: clusterId, value: SCatalog* SCatalogStat statInfo; @@ -515,9 +527,8 @@ typedef struct SCtgOperation { } SCtgOperation; typedef struct SCtgCacheItemInfo { - char *name; - int32_t flag; - uint32_t persistSize; + char* name; + int32_t flag; } SCtgCacheItemInfo; #define CTG_AUTH_READ(_t) ((_t) == AUTH_TYPE_READ || (_t) == AUTH_TYPE_READ_OR_WRITE) @@ -530,11 +541,6 @@ typedef struct SCtgCacheItemInfo { #define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n) #define CTG_STAT_GET(_item) atomic_load_64(&(_item)) -#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1 -#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1 -#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1 -#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0 - #define CTG_STAT_API_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.api.item, n)) #define CTG_STAT_RT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.runtime.item, n)) #define CTG_STAT_NUM_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.cache.cacheNum[item], n)) @@ -549,6 +555,11 @@ typedef struct SCtgCacheItemInfo { #define CTG_CACHE_HIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheHit[item], n)) #define CTG_CACHE_NHIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheNHit[item], n)) +#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1 +#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1 +#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1 +#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0 + #define CTG_META_NUM_INC(type) \ do { \ switch (type) { \ @@ -658,6 +669,10 @@ typedef struct SCtgCacheItemInfo { #define CTG_DB_NOT_EXIST(code) \ (code == TSDB_CODE_MND_DB_NOT_EXIST || code == TSDB_CODE_MND_DB_IN_CREATING || code == TSDB_CODE_MND_DB_IN_DROPPING) +#define CTG_CACHE_OVERFLOW(_csize, _maxsize) ((_maxsize >= 0) ? ((_csize) >= (_maxsize) * 1048576L * 0.9) : false) +#define CTG_CACHE_LOW(_csize, _maxsize) ((_maxsize >= 0) ? ((_csize) <= (_maxsize) * 1048576L * 0.75) : true) + + #define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgWarn(param, ...) qWarn("CTG:%p " param, pCtg, __VA_ARGS__) @@ -760,6 +775,12 @@ typedef struct SCtgCacheItemInfo { CTG_RET(__code); \ } while (0) +#define CTG_API_NLEAVE() \ + do { \ + CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); \ + CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \ + } while (0) + #define CTG_API_ENTER() \ do { \ CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \ @@ -769,6 +790,15 @@ typedef struct SCtgCacheItemInfo { } \ } while (0) +#define CTG_API_NENTER() \ + do { \ + CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \ + CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \ + if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \ + CTG_API_NLEAVE(); \ + } \ + } while (0) + #define CTG_API_JENTER() \ do { \ CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \ @@ -829,8 +859,8 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput* output, bool sy int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp* pAuth, bool syncReq); int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char* dbFName, int32_t vgId, SEpSet* pEpSet); int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex** pIndex, bool syncOp); -int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp); -int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type); +int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp); +int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type, int32_t size); int32_t ctgMetaRentAdd(SCtgRentMgmt* mgmt, void* meta, int64_t id, int32_t size); int32_t ctgMetaRentGet(SCtgRentMgmt* mgmt, void** res, uint32_t* num, int32_t size); int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq); @@ -911,7 +941,7 @@ void ctgFreeSTableIndex(void* info); void ctgClearSubTaskRes(SCtgSubRes* pRes); void ctgFreeQNode(SCtgQNode* node); void ctgClearHandle(SCatalog* pCtg); -void ctgFreeTbCacheImpl(SCtgTbCache* pCache); +void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock); int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName); int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists); @@ -927,6 +957,16 @@ void ctgReleaseVgMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCach void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCache* pCache); void ctgGetGlobalCacheStat(SCtgCacheStat* pStat); int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res); +void ctgGetGlobalCacheSize(uint64_t *pSize); +uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex); +uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta); +uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg); +uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth); +uint64_t ctgGetClusterCacheSize(SCatalog *pCtg); +void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone); +void ctgClearAllHandleMeta(bool *roundDone); +void ctgProcessTimerEvent(void *param, void *tmrId); + extern SCatalogMgmt gCtgMgmt; extern SCtgDebug gCTGDebug; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index bddc6c01a7..2dbe3c31ba 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -626,6 +626,36 @@ _return: CTG_RET(code); } +void ctgProcessTimerEvent(void *param, void *tmrId) { + CTG_API_NENTER(); + + int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize); + if (cacheMaxSize >= 0) { + uint64_t cacheSize = 0; + ctgGetGlobalCacheSize(&cacheSize); + bool overflow = CTG_CACHE_OVERFLOW(cacheSize, cacheMaxSize); + + qDebug("catalog cache size: %" PRIu64"B, maxCaseSize:%dMB, %s", cacheSize, cacheMaxSize, overflow ? "overflow" : "NO overflow"); + + if (overflow) { + int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false); + if (code) { + qError("clear cache enqueue failed, error:%s", tstrerror(code)); + taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + } + + goto _return; + } + } + + taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + +_return: + + CTG_API_NLEAVE(); +} + + int32_t catalogInit(SCatalogCfg* cfg) { if (gCtgMgmt.pCluster) { qError("catalog already initialized"); @@ -684,6 +714,18 @@ int32_t catalogInit(SCatalogCfg* cfg) { CTG_ERR_RET(terrno); } + gCtgMgmt.timer = taosTmrInit(0, 0, 0, "catalog"); + if (NULL == gCtgMgmt.timer) { + qError("init timer failed, error:%s", tstrerror(terrno)); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + gCtgMgmt.cacheTimer = taosTmrStart(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer); + if (NULL == gCtgMgmt.cacheTimer) { + qError("start cache timer failed"); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + CTG_ERR_RET(ctgStartUpdateThread()); qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, @@ -727,8 +769,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { clusterCtg->clusterId = clusterId; - CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB)); - CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE)); + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbVgVersion))); + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion))); clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -1511,11 +1553,11 @@ int32_t catalogClearCache(void) { qInfo("start to clear catalog cache"); - if (NULL == gCtgMgmt.pCluster || atomic_load_8((int8_t*)&gCtgMgmt.exit)) { + if (NULL == gCtgMgmt.pCluster) { CTG_API_LEAVE_NOLOCK(TSDB_CODE_SUCCESS); } - int32_t code = ctgClearCacheEnqueue(NULL, false, false, true); + int32_t code = ctgClearCacheEnqueue(NULL, false, false, false, true); qInfo("clear catalog cache end, code: %s", tstrerror(code)); @@ -1532,7 +1574,7 @@ void catalogDestroy(void) { atomic_store_8((int8_t*)&gCtgMgmt.exit, true); if (!taosCheckCurrentInDll()) { - ctgClearCacheEnqueue(NULL, true, true, true); + ctgClearCacheEnqueue(NULL, false, true, true, true); taosThreadJoin(gCtgMgmt.updateThread, NULL); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 92a5956fc4..c697c81325 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -32,14 +32,14 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}}; SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = { - {"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL, sizeof(SCatalog)}, //CTG_CI_CLUSTER - {"Dnode ", CTG_CI_FLAG_LEVEL_CLUSTER, 0}, //CTG_CI_DNODE, - {"Qnode ", CTG_CI_FLAG_LEVEL_CLUSTER, 0}, //CTG_CI_QNODE, - {"DB ", CTG_CI_FLAG_LEVEL_CLUSTER, sizeof(SCtgDBCache)}, //CTG_CI_DB, - {"DbVgroup ", CTG_CI_FLAG_LEVEL_DB, sizeof(SDBVgInfo)}, //CTG_CI_DB_VGROUP, - {"DbCfg ", CTG_CI_FLAG_LEVEL_DB, 0}, //CTG_CI_DB_CFG, - {"DbInfo ", CTG_CI_FLAG_LEVEL_DB, 0}, //CTG_CI_DB_INFO, - {"StbMeta ", CTG_CI_FLAG_LEVEL_DB, }, //CTG_CI_STABLE_META, + {"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL}, //CTG_CI_CLUSTER + {"Dnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DNODE, + {"Qnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_QNODE, + {"DB ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DB, + {"DbVgroup ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_VGROUP, + {"DbCfg ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_CFG, + {"DbInfo ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_INFO, + {"StbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_STABLE_META, {"NtbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_NTABLE_META, {"CtbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_CTABLE_META, {"SysTblMeta", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_SYSTABLE_META, @@ -1135,7 +1135,7 @@ _return: CTG_RET(code); } -int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) { +int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp) { int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_CLEAR_CACHE; @@ -1151,6 +1151,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool } msg->pCtg = pCtg; + msg->clearMeta = clearMeta; msg->freeCtg = freeCtg; op->data = msg; @@ -1163,10 +1164,11 @@ _return: CTG_RET(code); } -int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { +int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type, int32_t size) { mgmt->slotRIdx = 0; mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND; mgmt->type = type; + mgmt->metaSize = size; size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum; @@ -1176,6 +1178,8 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + mgmt->rentCacheSize = msgSize; + qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum); return TSDB_CODE_SUCCESS; @@ -1202,6 +1206,7 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + mgmt->rentCacheSize += size; slot->needSort = true; qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); @@ -1282,6 +1287,7 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp } taosArrayRemove(slot->meta, idx); + mgmt->rentCacheSize -= mgmt->metaSize; qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); @@ -1549,10 +1555,15 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam } if (origType == TSDB_SUPER_TABLE) { - if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) { - ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid); - } else { - ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid); + char *stbName = taosHashGet(dbCache->stbCache, &orig->suid, sizeof(orig->suid)); + if (stbName) { + uint64_t metaSize = strlen(stbName) + 1 + sizeof(orig->suid); + if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) { + ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid); + } else { + ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid); + atomic_sub_fetch_64(&dbCache->dbCacheSize, metaSize); + } } } } @@ -1566,14 +1577,20 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(meta)); + pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName)); } else { CTG_LOCK(CTG_WRITE, &pCache->metaLock); if (orig) { CTG_META_NUM_DEC(origType); } + + atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(meta) - ctgGetTbMetaCacheSize(pCache->pMeta)); + taosMemoryFree(pCache->pMeta); pCache->pMeta = meta; + CTG_UNLOCK(CTG_WRITE, &pCache->metaLock); } @@ -1591,6 +1608,8 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + atomic_add_fetch_64(&dbCache->dbCacheSize, sizeof(meta->suid) + strlen(tbName) + 1); + ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, meta->tableType); @@ -1623,6 +1642,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(SCtgTbCache) + ctgGetTbIndexCacheSize(pIndex)); + CTG_DB_NUM_INC(CTG_CI_TBL_SMA); *index = NULL; @@ -1639,6 +1660,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa CTG_LOCK(CTG_WRITE, &pCache->indexLock); if (pCache->pIndex) { + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pCache->pIndex)); if (0 == suid) { suid = pCache->pIndex->suid; } @@ -1649,6 +1671,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa pCache->pIndex = pIndex; CTG_UNLOCK(CTG_WRITE, &pCache->indexLock); + atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pIndex)); + *index = NULL; ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, @@ -1678,7 +1702,7 @@ _return: CTG_RET(code); } -void ctgClearAllInstance(void) { +void ctgClearAllHandles(void) { SCatalog *pCtg = NULL; void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); @@ -1693,7 +1717,7 @@ void ctgClearAllInstance(void) { } } -void ctgFreeAllInstance(void) { +void ctgFreeAllHandles(void) { SCatalog *pCtg = NULL; void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); @@ -1752,6 +1776,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { } SCtgVgCache *vgCache = &dbCache->vgCache; + CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache)); if (vgCache->vgInfo) { @@ -1774,6 +1799,8 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { goto _return; } + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(vgCache->vgInfo)); + freeVgInfo(vgInfo); CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP); } @@ -1787,6 +1814,8 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { ctgWUnlockVgInfo(dbCache); + atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(vgCache->vgInfo)); + dbCache = NULL; // if (!IS_SYS_DBNAME(dbFName)) { @@ -1850,6 +1879,8 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) { CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache)); + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(dbCache->vgCache.vgInfo)); + freeVgInfo(dbCache->vgCache.vgInfo); dbCache->vgCache.vgInfo = NULL; @@ -1941,26 +1972,32 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { goto _return; } - if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) { - ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, - msg->stbName, msg->suid); + char *stbName = taosHashGet(dbCache->stbCache, &msg->suid, sizeof(msg->suid)); + if (stbName) { + uint64_t metaSize = strlen(stbName) + 1 + sizeof(msg->suid); + if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) { + ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, + msg->stbName, msg->suid); + } else { + atomic_sub_fetch_64(&dbCache->dbCacheSize, metaSize); + } } - + SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName)); if (NULL == pTbCache) { ctgDebug("stb %s already not in cache", msg->stbName); goto _return; } - CTG_LOCK(CTG_WRITE, &pTbCache->metaLock); tblType = pTbCache->pMeta->tableType; - ctgFreeTbCacheImpl(pTbCache); - CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(pTbCache->pMeta) + ctgGetTbIndexCacheSize(pTbCache->pIndex)); + ctgFreeTbCacheImpl(pTbCache, true); if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) { ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid); } else { CTG_META_NUM_DEC(tblType); + atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(*pTbCache) + strlen(msg->stbName)); } ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid); @@ -2004,15 +2041,15 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { goto _return; } - CTG_LOCK(CTG_WRITE, &pTbCache->metaLock); tblType = pTbCache->pMeta->tableType; - ctgFreeTbCacheImpl(pTbCache); - CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(pTbCache->pMeta) + ctgGetTbIndexCacheSize(pTbCache->pIndex)); + ctgFreeTbCacheImpl(pTbCache, true); if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) { ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } else { + atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(*pTbCache) + strlen(msg->tbName)); CTG_META_NUM_DEC(tblType); } @@ -2039,7 +2076,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) { SCtgUserAuth userAuth = {0}; memcpy(&userAuth.userAuth, &msg->userAuth, sizeof(msg->userAuth)); - + userAuth.userCacheSize = ctgGetUserCacheSize(&userAuth.userAuth); + if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) { ctgError("taosHashPut user %s to cache failed", msg->userAuth.user); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); @@ -2079,6 +2117,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) { msg->userAuth.useDbs = NULL; CTG_UNLOCK(CTG_WRITE, &pUser->lock); + + atomic_store_64(&pUser->userCacheSize, ctgGetUserCacheSize(&pUser->userAuth)); _return: @@ -2214,33 +2254,77 @@ _return: CTG_RET(code); } -int32_t ctgOpClearCache(SCtgCacheOperation *operation) { - int32_t code = 0; +void ctgClearFreeCache(SCtgCacheOperation *operation) { SCtgClearCacheMsg *msg = operation->data; SCatalog *pCtg = msg->pCtg; CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); - + if (pCtg) { if (msg->freeCtg) { ctgFreeHandle(pCtg); } else { ctgClearHandle(pCtg); } + } else if (msg->freeCtg) { + ctgFreeAllHandles(); + } else { + ctgClearAllHandles(); + } + + CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock); +} - goto _return; +void ctgClearMetaCache(SCtgCacheOperation *operation) { + SCtgClearCacheMsg *msg = operation->data; + SCatalog *pCtg = msg->pCtg; + int64_t clearedSize = 0; + int64_t clearedNum = 0; + int64_t remainSize = 0; + bool roundDone = false; + + if (pCtg) { + ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, &roundDone); + } else { + ctgClearAllHandleMeta(&roundDone); } - if (msg->freeCtg) { - ctgFreeAllInstance(); + qDebug("catalog finish one round meta clear, done:%d", roundDone); + + ctgGetGlobalCacheSize(&remainSize); + int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize); + + if (!CTG_CACHE_LOW(remainSize, cacheMaxSize)) { + qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize); + taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + return; + } + + if (!roundDone) { + ctgClearFreeCache(operation); + taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + return; + } + + int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false); + if (code) { + qError("clear cache enqueue failed, error:%s", tstrerror(code)); + taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + } +} + +int32_t ctgOpClearCache(SCtgCacheOperation *operation) { + int32_t code = 0; + SCtgClearCacheMsg *msg = operation->data; + + if (msg->clearMeta) { + ctgClearMetaCache(operation); } else { - ctgClearAllInstance(); + ctgClearFreeCache(operation); } _return: - CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock); - taosMemoryFreeClear(msg); CTG_RET(code); diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index 12ff8a7b38..f4aeca5876 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -19,7 +19,7 @@ #include "trpc.h" extern SCatalogMgmt gCtgMgmt; -SCtgDebug gCTGDebug = {0}; +SCtgDebug gCTGDebug = {.statEnable = true}; #if 0 @@ -528,7 +528,10 @@ int32_t ctgdShowStatInfo(void) { CTG_API_ENTER(); SCtgCacheStat cache; + uint64_t cacheSize = 0; + ctgGetGlobalCacheStat(&cache); + ctgGetGlobalCacheSize(&cacheSize); qDebug("## Global Stat Info %s ##", "begin"); qDebug("## \t%s \t%s \t%s ##", "Num", "Hit", "Nhit"); @@ -536,6 +539,7 @@ int32_t ctgdShowStatInfo(void) { qDebug("# %s \t%" PRIu64 " \t%" PRIu64 " \t%" PRIu64 " #", gCtgStatItem[i].name, cache.cacheNum[i], cache.cacheHit[i], cache.cacheNHit[i]); } qDebug("## Global Stat Info %s ##", "end"); + qDebug("## Global Cache Size: %" PRIu64, cacheSize); CTG_API_LEAVE(TSDB_CODE_SUCCESS); } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index bb45b9bd95..dfa8a3b89a 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -196,6 +196,7 @@ void ctgFreeMetaRent(SCtgRentMgmt* mgmt) { } taosMemoryFreeClear(mgmt->slots); + mgmt->rentCacheSize = 0; } void ctgFreeStbMetaCache(SCtgDBCache* dbCache) { @@ -208,12 +209,26 @@ void ctgFreeStbMetaCache(SCtgDBCache* dbCache) { dbCache->stbCache = NULL; } -void ctgFreeTbCacheImpl(SCtgTbCache* pCache) { - qDebug("tbMeta freed, p:%p", pCache->pMeta); - taosMemoryFreeClear(pCache->pMeta); +void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock) { + if (pCache->pMeta) { + if (lock) { + CTG_LOCK(CTG_WRITE, &pCache->metaLock); + } + taosMemoryFreeClear(pCache->pMeta); + if (lock) { + CTG_UNLOCK(CTG_WRITE, &pCache->metaLock); + } + } + if (pCache->pIndex) { + if (lock) { + CTG_LOCK(CTG_WRITE, &pCache->indexLock); + } taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo); taosMemoryFreeClear(pCache->pIndex); + if (lock) { + CTG_UNLOCK(CTG_WRITE, &pCache->indexLock); + } } } @@ -225,7 +240,7 @@ void ctgFreeTbCache(SCtgDBCache* dbCache) { int32_t tblNum = taosHashGetSize(dbCache->tbCache); SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL); while (NULL != pCache) { - ctgFreeTbCacheImpl(pCache); + ctgFreeTbCacheImpl(pCache, false); pCache = taosHashIterate(dbCache->tbCache, pCache); } taosHashCleanup(dbCache->tbCache); @@ -311,21 +326,82 @@ void ctgFreeHandle(SCatalog* pCtg) { ctgInfo("handle freed, clusterId:0x%" PRIx64, clusterId); } +void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone) { + int64_t cacheSize = 0; + void* pIter = taosHashIterate(pCtg->dbCache, NULL); + while (pIter) { + SCtgDBCache* dbCache = pIter; + + SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL); + while (NULL != pCache) { + size_t len = 0; + void* key = taosHashGetKey(pCache, &len); + + if (pCache->pMeta && TSDB_SUPER_TABLE == pCache->pMeta->tableType) { + continue; + } + + taosHashRemove(dbCache->tbCache, key, len); + cacheSize = len + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(pCache->pMeta) + ctgGetTbIndexCacheSize(pCache->pIndex); + atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize); + *pClearedSize += cacheSize; + (*pCleardNum)++; + + ctgFreeTbCacheImpl(pCache, true); + + if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) { + goto _return; + } + + pCache = taosHashIterate(dbCache->tbCache, pCache); + } + + pIter = taosHashIterate(pCtg->dbCache, pIter); + } + +_return: + + if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) { + *roundDone = true; + } +} + +void ctgClearAllHandleMeta(bool *roundDone) { + int64_t clearedSize = 0; + int64_t clearedNum = 0; + SCatalog *pCtg = NULL; + + void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); + while (pIter) { + pCtg = *(SCatalog **)pIter; + + if (pCtg) { + ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, roundDone); + if (*roundDone) { + taosHashCancelIterate(gCtgMgmt.pCluster, pIter); + break; + } + } + + pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); + } +} + void ctgClearHandle(SCatalog* pCtg) { if (NULL == pCtg) { return; } uint64_t clusterId = pCtg->clusterId; - + ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); ctgFreeInstDbCache(pCtg->dbCache); ctgFreeInstUserCache(pCtg->userCache); - ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB); - ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE); + ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbVgVersion)); + ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion)); pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -1562,6 +1638,130 @@ void catalogFreeMetaData(SMetaData* pData) { } #endif +uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex) { + if (NULL == pIndex) { + return 0; + } + + return sizeof(*pIndex) + pIndex->indexSize; +} + +FORCE_INLINE uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta) { + if (NULL == pMeta) { + return 0; + } + + switch (pMeta->tableType) { + case TSDB_SUPER_TABLE: + return sizeof(*pMeta) + (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags) * sizeof(SSchema); + case TSDB_CHILD_TABLE: + return sizeof(SCTableMeta); + default: + return sizeof(*pMeta) + pMeta->tableInfo.numOfColumns * sizeof(SSchema); + } + + return 0; +} + +uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg) { + if (NULL == pVg) { + return 0; + } + + return sizeof(*pVg) + taosHashGetSize(pVg->vgHash) * (sizeof(SVgroupInfo) + sizeof(int32_t)) + + taosArrayGetSize(pVg->vgArray) * sizeof(SVgroupInfo); +} + +uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth) { + if (NULL == pAuth) { + return 0; + } + + uint64_t cacheSize = 0; + char* p = taosHashIterate(pAuth->createdDbs, NULL); + while (p != NULL) { + size_t len = 0; + void* key = taosHashGetKey(p, &len); + cacheSize += len + strlen(p); + + p = taosHashIterate(pAuth->createdDbs, p); + } + + p = taosHashIterate(pAuth->readDbs, NULL); + while (p != NULL) { + size_t len = 0; + void* key = taosHashGetKey(p, &len); + cacheSize += len + strlen(p); + + p = taosHashIterate(pAuth->readDbs, p); + } + + p = taosHashIterate(pAuth->writeDbs, NULL); + while (p != NULL) { + size_t len = 0; + void* key = taosHashGetKey(p, &len); + cacheSize += len + strlen(p); + + p = taosHashIterate(pAuth->writeDbs, p); + } + + p = taosHashIterate(pAuth->readTbs, NULL); + while (p != NULL) { + size_t len = 0; + void* key = taosHashGetKey(p, &len); + cacheSize += len + strlen(p); + + p = taosHashIterate(pAuth->readTbs, p); + } + + p = taosHashIterate(pAuth->writeTbs, NULL); + while (p != NULL) { + size_t len = 0; + void* key = taosHashGetKey(p, &len); + cacheSize += len + strlen(p); + + p = taosHashIterate(pAuth->writeTbs, p); + } + + int32_t *ref = taosHashIterate(pAuth->useDbs, NULL); + while (ref != NULL) { + size_t len = 0; + void* key = taosHashGetKey(ref, &len); + cacheSize += len + sizeof(*ref); + + ref = taosHashIterate(pAuth->useDbs, ref); + } + + return cacheSize; +} + +uint64_t ctgGetClusterCacheSize(SCatalog *pCtg) { + uint64_t cacheSize = sizeof(SCatalog); + + SCtgUserAuth* pAuth = taosHashIterate(pCtg->userCache, NULL); + while (pAuth != NULL) { + size_t len = 0; + void* key = taosHashGetKey(pAuth, &len); + cacheSize += len + sizeof(SCtgUserAuth) + atomic_load_64(&pAuth->userCacheSize); + + pAuth = taosHashIterate(pCtg->userCache, pAuth); + } + + SCtgDBCache* pDb = taosHashIterate(pCtg->dbCache, NULL); + while (pDb != NULL) { + size_t len = 0; + void* key = taosHashGetKey(pDb, &len); + cacheSize += len + sizeof(SCtgDBCache) + atomic_load_64(&pDb->dbCacheSize); + + pDb = taosHashIterate(pCtg->dbCache, pDb); + } + + cacheSize += pCtg->dbRent.rentCacheSize; + cacheSize += pCtg->stbRent.rentCacheSize; + + return cacheSize; +} + void ctgGetClusterCacheStat(SCatalog* pCtg) { for (int32_t i = 0; i < CTG_CI_MAX_VALUE; ++i) { if (0 == (gCtgStatItem[i].flag & CTG_CI_FLAG_LEVEL_DB)) { @@ -1627,10 +1827,22 @@ void ctgGetGlobalCacheStat(SCtgCacheStat* pStat) { memcpy(pStat, &gCtgMgmt.statInfo.cache, sizeof(gCtgMgmt.statInfo.cache)); } -void ctgGetGlobalCacheUsedSize(uint64_t *pSize) { - SCtgCacheStat stat; - ctgGetGlobalCacheStat(&stat); - do; - +void ctgGetGlobalCacheSize(uint64_t *pSize) { + *pSize = 0; + + SCatalog* pCtg = NULL; + void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); + while (pIter) { + size_t len = 0; + void* key = taosHashGetKey(pIter, &len); + *pSize += len + POINTER_BYTES; + + pCtg = *(SCatalog**)pIter; + if (pCtg) { + *pSize += ctgGetClusterCacheSize(pCtg); + } + + pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); + } } diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 58c43829cf..185d1482bb 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -743,7 +743,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { return terrno; } - if (taosSetCfg(tsCfg, pStmt->config)) { + if (taosApplyLocalCfg(tsCfg, pStmt->config)) { return terrno; } From 9fcf56a4ee363fa076b11034e1fe09973cbd7f7d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 16 May 2023 10:51:58 +0800 Subject: [PATCH 03/10] feat: support meta cleanup --- include/libs/catalog/catalog.h | 2 +- source/client/src/clientHb.c | 2 +- source/client/src/clientImpl.c | 2 +- source/libs/catalog/inc/catalogInt.h | 2 +- source/libs/catalog/src/catalog.c | 26 ++++++++++++++++++++++- source/libs/catalog/src/ctgCache.c | 9 ++++---- source/libs/catalog/src/ctgUtil.c | 14 +++++++----- source/libs/catalog/test/catalogTests.cpp | 2 ++ 8 files changed, 45 insertions(+), 14 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 2c684f8f76..8b9c1aae31 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -210,7 +210,7 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg); -int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg); +int32_t catalogAsyncUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg); int32_t catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 203aad8068..7bfb466227 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -217,7 +217,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_TSC_INVALID_VALUE; } - catalogUpdateTableMeta(pCatalog, rsp); + catalogAsyncUpdateTableMeta(pCatalog, rsp); } } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f8eade1d7c..6f129d7418 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -815,7 +815,7 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) { } int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) { - return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res); + return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res); } int32_t handleQueryExecRsp(SRequestObj* pRequest) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9f892f23ff..7728749945 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -964,7 +964,7 @@ uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg); uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth); uint64_t ctgGetClusterCacheSize(SCatalog *pCtg); void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone); -void ctgClearAllHandleMeta(bool *roundDone); +void ctgClearAllHandleMeta(int64_t *clearedSize, int64_t *clearedNum, bool *roundDone); void ctgProcessTimerEvent(void *param, void *tmrId); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 2dbe3c31ba..169dad4a89 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -647,7 +647,8 @@ void ctgProcessTimerEvent(void *param, void *tmrId) { goto _return; } } - + + qTrace("reset catalog timer"); taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); _return: @@ -1105,6 +1106,22 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogAsyncUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pMsg) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t code = 0; + CTG_ERR_JRET(ctgUpdateTbMeta(pCtg, pMsg, false)); + +_return: + + CTG_API_LEAVE(code); +} + + int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* pTables) { CTG_API_ENTER(); @@ -1571,6 +1588,13 @@ void catalogDestroy(void) { return; } + if (gCtgMgmt.cacheTimer) { + taosTmrStop(gCtgMgmt.cacheTimer); + gCtgMgmt.cacheTimer = NULL; + taosTmrCleanUp(gCtgMgmt.timer); + gCtgMgmt.timer = NULL; + } + atomic_store_8((int8_t*)&gCtgMgmt.exit, true); if (!taosCheckCurrentInDll()) { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index c697c81325..50e3d9ba57 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -791,7 +791,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) { CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); - ctgDebug("action [%s] added into queue", opName); + ctgDebug("%sync action [%s] added into queue", syncOp ? "S": "As", opName); CTG_QUEUE_INC(); CTG_STAT_RT_INC(numOfOpEnqueue, 1); @@ -2286,21 +2286,22 @@ void ctgClearMetaCache(SCtgCacheOperation *operation) { if (pCtg) { ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, &roundDone); } else { - ctgClearAllHandleMeta(&roundDone); + ctgClearAllHandleMeta(&clearedSize, &clearedNum, &roundDone); } - qDebug("catalog finish one round meta clear, done:%d", roundDone); + qDebug("catalog finish one round meta clear, clearedSize:%" PRId64 ", clearedNum:%" PRId64 ", done:%d", clearedSize, clearedNum, roundDone); ctgGetGlobalCacheSize(&remainSize); int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize); - if (!CTG_CACHE_LOW(remainSize, cacheMaxSize)) { + if (CTG_CACHE_LOW(remainSize, cacheMaxSize)) { qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize); taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); return; } if (!roundDone) { + qDebug("catalog all meta cleared, remainSize:%" PRId64 ", cacheMaxSize:%dMB, to clear handle", remainSize, cacheMaxSize); ctgClearFreeCache(operation); taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); return; diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index dfa8a3b89a..3748615532 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -338,6 +338,7 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardN void* key = taosHashGetKey(pCache, &len); if (pCache->pMeta && TSDB_SUPER_TABLE == pCache->pMeta->tableType) { + pCache = taosHashIterate(dbCache->tbCache, pCache); continue; } @@ -346,10 +347,15 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardN atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize); *pClearedSize += cacheSize; (*pCleardNum)++; + + if (pCache->pMeta) { + CTG_META_NUM_DEC(pCache->pMeta->tableType); + } ctgFreeTbCacheImpl(pCache, true); - + if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) { + taosHashCancelIterate(dbCache->tbCache, pCache); goto _return; } @@ -366,9 +372,7 @@ _return: } } -void ctgClearAllHandleMeta(bool *roundDone) { - int64_t clearedSize = 0; - int64_t clearedNum = 0; +void ctgClearAllHandleMeta(int64_t *clearedSize, int64_t *clearedNum, bool *roundDone) { SCatalog *pCtg = NULL; void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); @@ -376,7 +380,7 @@ void ctgClearAllHandleMeta(bool *roundDone) { pCtg = *(SCatalog **)pIter; if (pCtg) { - ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, roundDone); + ctgClearHandleMeta(pCtg, clearedSize, clearedNum, roundDone); if (*roundDone) { taosHashCancelIterate(gCtgMgmt.pCluster, pIter); break; diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 2cba433e84..a4786a77dc 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -1746,6 +1746,8 @@ TEST(tableMeta, updateStbMeta) { code = catalogUpdateTableMeta(pCtg, &rsp); ASSERT_EQ(code, 0); + code = catalogAsyncUpdateTableMeta(pCtg, &rsp); + ASSERT_EQ(code, 0); taosMemoryFreeClear(rsp.pSchemas); while (true) { From a89a3eb83c597d889b5a9ff587f7cf115ce88146 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 17 May 2023 10:18:46 +0800 Subject: [PATCH 04/10] fix: sys db clear vgroups issue --- source/libs/catalog/src/ctgCache.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 922822ecc5..fd83e2f88f 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -46,6 +46,7 @@ SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = { {"OthTblMeta", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_OTHERTABLE_META, {"TblSMA ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_TBL_SMA, {"TblCfg ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_TBL_CFG, + {"TblTag ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_TBL_TAG, {"IndexInfo ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_INDEX_INFO, {"User ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_USER, {"UDF ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_UDF, @@ -1783,7 +1784,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { goto _return; } - if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { + if (dbInfo->vgVersion < 0 || (taosHashGetSize(dbInfo->vgHash) <= 0 && !IS_SYS_DBNAME(dbFName))) { ctgDebug("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); CTG_ERR_JRET(TSDB_CODE_APP_ERROR); @@ -1824,7 +1825,10 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { goto _return; } - atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(vgCache->vgInfo)); + uint64_t groupCacheSize = ctgGetDbVgroupCacheSize(vgCache->vgInfo); + ctgDebug("sub dbGroupCacheSize %" PRIu64 " from db, dbFName:%s", groupCacheSize, dbFName); + + atomic_sub_fetch_64(&dbCache->dbCacheSize, groupCacheSize); freeVgInfo(vgInfo); CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP); @@ -1839,7 +1843,9 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { ctgWUnlockVgInfo(dbCache); - atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(vgCache->vgInfo)); + uint64_t groupCacheSize = ctgGetDbVgroupCacheSize(vgCache->vgInfo); + atomic_add_fetch_64(&dbCache->dbCacheSize, groupCacheSize); + ctgDebug("add dbGroupCacheSize %" PRIu64 " from db, dbFName:%s", groupCacheSize, dbFName); dbCache = NULL; From 77cfab5093c69723b568c8e6580d79aa455345c2 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 16 May 2023 18:12:44 +0800 Subject: [PATCH 05/10] fix: invalid memory access of buffer after vsnprintf --- source/util/src/tlog.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index c8ac15786f..16dac70103 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -556,6 +556,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer); va_end(argpointer); + len = len > LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 ? LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 : len; buffer[len++] = '\n'; buffer[len] = 0; From 25ae00632e07c9469b002ae24e83a597a7f76b5b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 18 May 2023 14:50:03 +0800 Subject: [PATCH 06/10] fix: code merge issue --- source/libs/catalog/src/catalog.c | 2 +- source/libs/catalog/src/ctgUtil.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 93f37f6876..201cd713a9 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -828,7 +828,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { clusterCtg->clusterId = clusterId; - CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbVgVersion))); + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbCacheInfo))); CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion))); clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 6df70cd724..cf864e8643 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -409,7 +409,7 @@ void ctgClearHandle(SCatalog* pCtg) { ctgFreeInstDbCache(pCtg->dbCache); ctgFreeInstUserCache(pCtg->userCache); - ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbVgVersion)); + ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbCacheInfo)); ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion)); pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, From a61f0533cf19d6c23b617e2e6967a114e39a9b1e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 18 May 2023 18:00:15 +0800 Subject: [PATCH 07/10] fix: invalid read issue --- source/common/src/tmsg.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 91d53884b8..a975f92569 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1566,21 +1566,21 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs char db[TSDB_DB_FNAME_LEN] = {0}; if (tDecodeCStrTo(pDecoder, db) < 0) return -1; int32_t len = strlen(db); - taosHashPut(pRsp->createdDbs, db, len, db, len); + taosHashPut(pRsp->createdDbs, db, len, db, len + 1); } for (int32_t i = 0; i < numOfReadDbs; ++i) { char db[TSDB_DB_FNAME_LEN] = {0}; if (tDecodeCStrTo(pDecoder, db) < 0) return -1; int32_t len = strlen(db); - taosHashPut(pRsp->readDbs, db, len, db, len); + taosHashPut(pRsp->readDbs, db, len, db, len + 1); } for (int32_t i = 0; i < numOfWriteDbs; ++i) { char db[TSDB_DB_FNAME_LEN] = {0}; if (tDecodeCStrTo(pDecoder, db) < 0) return -1; int32_t len = strlen(db); - taosHashPut(pRsp->writeDbs, db, len, db, len); + taosHashPut(pRsp->writeDbs, db, len, db, len + 1); } if (!tDecodeIsEnd(pDecoder)) { From bd9c726e6d398ee813e6d5cb75dc335b0805ac98 Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Fri, 19 May 2023 11:59:36 +0800 Subject: [PATCH 08/10] doc: refine kafka connector --- docs/en/20-third-party/11-kafka.md | 8 +++----- docs/zh/20-third-party/11-kafka.md | 8 +++----- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/docs/en/20-third-party/11-kafka.md b/docs/en/20-third-party/11-kafka.md index 71d8c41173..f09ebb274c 100644 --- a/docs/en/20-third-party/11-kafka.md +++ b/docs/en/20-third-party/11-kafka.md @@ -314,7 +314,6 @@ connection.backoff.ms=5000 topic.prefix=tdengine-source- poll.interval.ms=1000 fetch.max.rows=100 -out.format=line key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ``` @@ -353,7 +352,7 @@ confluent local services connect connector load TDengineSourceConnector --config ### View topic data -Use the kafka-console-consumer command-line tool to monitor data in the topic tdengine-source-test. In the beginning, all historical data will be output. After inserting two new data into TDengine, kafka-console-consumer immediately outputs the two new data. +Use the kafka-console-consumer command-line tool to monitor data in the topic tdengine-source-test. In the beginning, all historical data will be output. After inserting two new data into TDengine, kafka-console-consumer immediately outputs the two new data. The output is in InfluxDB line protocol format. ```` kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test @@ -428,9 +427,8 @@ The following configuration items apply to TDengine Sink Connector and TDengine 3. `timestamp.initial`: Data synchronization start time. The format is 'yyyy-MM-dd HH:mm:ss'. If it is not set, the data importing to Kafka will be started from the first/oldest row in the database. 4. `poll.interval.ms`: The time interval for checking newly created tables or removed tables, default value is 1000. 5. `fetch.max.rows`: The maximum number of rows retrieved when retrieving the database, default is 100. -6. `out.format`: The data format. The value could be `line`, which represents the InfluxDB Line protocol format. -7. 7. `query.interval.ms`: The time range of reading data from TDengine each time, its unit is millisecond. It should be adjusted according to the data flow in rate, the default value is 1000. -8. `topic.per.stable`: If it's set to true, it means one super table in TDengine corresponds to a topic in Kafka, the topic naming rule is `--`; if it's set to false, it means the whole DB corresponds to a topic in Kafka, the topic naming rule is `-`. +6. `query.interval.ms`: The time range of reading data from TDengine each time, its unit is millisecond. It should be adjusted according to the data flow in rate, the default value is 1000. +7. `topic.per.stable`: If it's set to true, it means one super table in TDengine corresponds to a topic in Kafka, the topic naming rule is `--`; if it's set to false, it means the whole DB corresponds to a topic in Kafka, the topic naming rule is `-`. diff --git a/docs/zh/20-third-party/11-kafka.md b/docs/zh/20-third-party/11-kafka.md index 44ee528d9b..97e78c2fde 100644 --- a/docs/zh/20-third-party/11-kafka.md +++ b/docs/zh/20-third-party/11-kafka.md @@ -318,7 +318,6 @@ connection.backoff.ms=5000 topic.prefix=tdengine-source- poll.interval.ms=1000 fetch.max.rows=100 -out.format=line key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ``` @@ -357,7 +356,7 @@ confluent local services connect connector load TDengineSourceConnector --config ### 查看 topic 数据 -使用 kafka-console-consumer 命令行工具监控主题 tdengine-source-test 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后,kafka-console-consumer 也立即输出了新增的两条数据。 +使用 kafka-console-consumer 命令行工具监控主题 tdengine-source-test 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后,kafka-console-consumer 也立即输出了新增的两条数据。 输出数据 InfluxDB line protocol 的格式。 ``` kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test @@ -438,9 +437,8 @@ confluent local services connect connector unload TDengineSourceConnector 3. `timestamp.initial`: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss',若未指定则从指定 DB 中最早的一条记录开始。 4. `poll.interval.ms`: 检查是否有新建或删除的表的时间间隔,单位为 ms。默认为 1000。 5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。 -6. `out.format`: 数据格式。取值为 `line`, 表示 InfluxDB Line 协议格式 -7. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 1000. -8. `topic.per.stable`: 如果设置为true,表示一个超级表对应一个 Kafka topic,topic的命名规则 `--`;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 `-` +6. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 1000. +7. `topic.per.stable`: 如果设置为true,表示一个超级表对应一个 Kafka topic,topic的命名规则 `--`;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 `-` ## 其他说明 From e1291bd8b41b735be6df21972387e634eda81f22 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 19 May 2023 13:28:58 +0800 Subject: [PATCH 09/10] opt stream ci test --- source/libs/stream/src/streamState.c | 2 +- tests/script/tsim/stream/sliding.sim | 47 +++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 98b685d8b9..7e2c62f73a 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -25,7 +25,7 @@ #include "tref.h" #include "ttimer.h" -#define MAX_TABLE_NAME_NUM 2000000 +#define MAX_TABLE_NAME_NUM 200000 int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { if (pWin1->groupId > pWin2->groupId) { diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 633b806f71..3312ccbec4 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -442,9 +442,8 @@ sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s); -sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); -sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s); +sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt21 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s); +sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt22 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791223001,2,2,2,1.1); @@ -468,7 +467,7 @@ if $loop_count == 10 then return -1 endi -sql select * from streamt; +sql select * from streamt21; # row 0 if $data01 != 2 then @@ -526,7 +525,7 @@ if $loop_count == 10 then return -1 endi -sql select * from streamt2; +sql select * from streamt22; # row 0 if $data01 != 4 then @@ -585,7 +584,39 @@ if $loop_count == 10 then return -1 endi -sql select * from streamt3; +sql create database test3 vgroups 6; +sql use test3; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s); + +sql insert into t1 values(1648791213000,1,1,1,1.0); +sql insert into t1 values(1648791223001,2,2,2,1.1); +sql insert into t1 values(1648791233002,3,3,3,2.1); +sql insert into t1 values(1648791243003,4,4,4,3.1); +sql insert into t1 values(1648791213004,4,5,5,4.1); + +sql insert into t2 values(1648791213000,1,6,6,1.0); +sql insert into t2 values(1648791223001,2,7,7,1.1); +sql insert into t2 values(1648791233002,3,8,8,2.1); +sql insert into t2 values(1648791243003,4,9,9,3.1); +sql insert into t2 values(1648791213004,4,10,10,4.1); + +$loop_count = 0 + +print step 7 + +loop4: +sleep 100 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt23; # row 0 if $rows != 5 then @@ -629,7 +660,7 @@ if $loop_count == 10 then return -1 endi -sql select * from streamt3; +sql select * from streamt23; # row 0 if $rows != 7 then @@ -688,6 +719,8 @@ sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 in sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791243000,2,1,1,1.0); +sleep 1000 + sql insert into t2 values(1648791273000,3,1,1,1.0); sql insert into t2 values(1648791313000,4,1,1,1.0); From efbc3fc7c6d9db9e4c846c683300760a555aa4ab Mon Sep 17 00:00:00 2001 From: dapan <89396746@qq.com> Date: Fri, 19 May 2023 14:19:29 +0800 Subject: [PATCH 10/10] fix: tmr in windows can't re-init issue --- source/libs/catalog/src/catalog.c | 1 + source/libs/catalog/test/catalogTests.cpp | 2 + source/util/src/ttimer.c | 47 ++++++++++++++++++----- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 201cd713a9..03df240929 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -716,6 +716,7 @@ int32_t ctgGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName int32_t catalogInit(SCatalogCfg* cfg) { + qDebug("catalogInit start"); if (gCtgMgmt.pCluster) { qError("catalog already initialized"); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index dd77bb14e6..e0e456402b 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -37,6 +37,7 @@ #include "tglobal.h" #include "trpc.h" #include "tvariant.h" +#include "ttimer.h" namespace { @@ -150,6 +151,7 @@ void ctgTestInitLogFile() { tsAsyncLog = 0; qDebugFlag = 159; + tmrDebugFlag = 159; strcpy(tsLogDir, TD_LOG_DIR_PATH); ctgdEnableDebug("api", true); diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index 7e99d6a35c..cb01fb2d13 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -113,7 +113,7 @@ typedef struct time_wheel_t { static int32_t tsMaxTmrCtrl = TSDB_MAX_VNODES_PER_DB + 100; -static TdThreadOnce tmrModuleInit = PTHREAD_ONCE_INIT; +static int32_t tmrModuleInit = 0; static TdThreadMutex tmrCtrlMutex; static tmr_ctrl_t* tmrCtrls; static tmr_ctrl_t* unusedTmrCtrl = NULL; @@ -512,11 +512,11 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han return stopped; } -static void taosTmrModuleInit(void) { +static int32_t taosTmrModuleInit(void) { tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl); if (tmrCtrls == NULL) { tmrError("failed to allocate memory for timer controllers."); - return; + return -1; } memset(&timerMap, 0, sizeof(timerMap)); @@ -535,14 +535,14 @@ static void taosTmrModuleInit(void) { time_wheel_t* wheel = wheels + i; if (taosThreadMutexInit(&wheel->mutex, NULL) != 0) { tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno)); - return; + return -1; } wheel->nextScanAt = now + wheel->resolution; wheel->index = 0; wheel->slots = (tmr_obj_t**)taosMemoryCalloc(wheel->size, sizeof(tmr_obj_t*)); if (wheel->slots == NULL) { tmrError("failed to allocate wheel slots"); - return; + return -1; } timerMap.size += wheel->size; } @@ -551,20 +551,48 @@ static void taosTmrModuleInit(void) { timerMap.slots = (timer_list_t*)taosMemoryCalloc(timerMap.size, sizeof(timer_list_t)); if (timerMap.slots == NULL) { tmrError("failed to allocate hash map"); - return; + return -1; } tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL); taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK); tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads); + + return 2; +} + +static int32_t taosTmrInitModule(void) { + if (atomic_load_32(&tmrModuleInit) == 2) { + return 0; + } + + if (atomic_load_32(&tmrModuleInit) < 0) { + return -1; + } + + while (true) { + if (0 == atomic_val_compare_exchange_32(&tmrModuleInit, 0, 1)) { + atomic_store_32(&tmrModuleInit, taosTmrModuleInit()); + } else if (atomic_load_32(&tmrModuleInit) < 0) { + return -1; + } else if (atomic_load_32(&tmrModuleInit) == 2) { + return 0; + } else { + taosMsleep(1); + } + } + + return -1; } void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, const char* label) { const char* ret = taosMonotonicInit(); tmrDebug("ttimer monotonic clock source:%s", ret); - taosThreadOnce(&tmrModuleInit, taosTmrModuleInit); + if (taosTmrInitModule() < 0) { + return NULL; + } taosThreadMutexLock(&tmrCtrlMutex); tmr_ctrl_t* ctrl = unusedTmrCtrl; @@ -581,6 +609,7 @@ void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, con } tstrncpy(ctrl->label, label, sizeof(ctrl->label)); + tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl); return ctrl; } @@ -629,8 +658,6 @@ void taosTmrCleanUp(void* handle) { tmrCtrls = NULL; unusedTmrCtrl = NULL; -#if defined(LINUX) - tmrModuleInit = PTHREAD_ONCE_INIT; // to support restart -#endif + atomic_store_32(&tmrModuleInit, 0); } }