feat: add client meta limitation
This commit is contained in:
parent
cde97b6bc1
commit
79d4403377
|
@ -118,6 +118,7 @@ extern int32_t tsRedirectFactor;
|
|||
extern int32_t tsRedirectMaxPeriod;
|
||||
extern int32_t tsMaxRetryWaitTime;
|
||||
extern bool tsUseAdapter;
|
||||
extern int32_t tsMetaCacheMaxSize;
|
||||
|
||||
// client
|
||||
extern int32_t tsMinSlidingTime;
|
||||
|
@ -183,7 +184,7 @@ struct SConfig *taosGetCfg();
|
|||
|
||||
void taosSetAllDebugFlag(int32_t flag, bool rewrite);
|
||||
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal, bool rewrite);
|
||||
int32_t taosSetCfg(SConfig *pCfg, char *name);
|
||||
int32_t taosApplyLocalCfg(SConfig *pCfg, char *name);
|
||||
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -3123,7 +3123,8 @@ typedef struct {
|
|||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
uint64_t suid;
|
||||
int32_t version;
|
||||
SArray* pIndex;
|
||||
int32_t indexSize;
|
||||
SArray* pIndex; // STableIndexInfo
|
||||
} STableIndexRsp;
|
||||
|
||||
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
|
||||
|
|
|
@ -90,28 +90,23 @@ typedef struct STbVerInfo {
|
|||
int32_t tversion;
|
||||
} STbVerInfo;
|
||||
|
||||
/*
|
||||
* ASSERT(sizeof(SCTableMeta) == 24)
|
||||
* ASSERT(tableType == TSDB_CHILD_TABLE)
|
||||
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
|
||||
*/
|
||||
#pragma pack(push, 1)
|
||||
typedef struct SCTableMeta {
|
||||
int32_t vgId : 24;
|
||||
int8_t tableType;
|
||||
uint64_t uid;
|
||||
uint64_t suid;
|
||||
int32_t vgId;
|
||||
int8_t tableType;
|
||||
} SCTableMeta;
|
||||
#pragma pack(pop)
|
||||
|
||||
/*
|
||||
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a
|
||||
* SCTableMeta.
|
||||
*/
|
||||
|
||||
#pragma pack(push, 1)
|
||||
typedef struct STableMeta {
|
||||
// BEGIN: KEEP THIS PART SAME WITH SCTableMeta
|
||||
int32_t vgId : 24;
|
||||
int8_t tableType;
|
||||
uint64_t uid;
|
||||
uint64_t suid;
|
||||
int32_t vgId;
|
||||
int8_t tableType;
|
||||
// END: KEEP THIS PART SAME WITH SCTableMeta
|
||||
|
||||
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta
|
||||
|
@ -121,6 +116,7 @@ typedef struct STableMeta {
|
|||
STableComInfo tableInfo;
|
||||
SSchema schema[];
|
||||
} STableMeta;
|
||||
#pragma pack(pop)
|
||||
|
||||
typedef struct SDBVgInfo {
|
||||
int32_t vgVersion;
|
||||
|
@ -130,7 +126,7 @@ typedef struct SDBVgInfo {
|
|||
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
|
||||
int64_t stateTs;
|
||||
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
|
||||
SArray* vgArray;
|
||||
SArray* vgArray; // SVgroupInfo
|
||||
} SDBVgInfo;
|
||||
|
||||
typedef struct SUseDbOutput {
|
||||
|
|
|
@ -645,7 +645,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
} else {
|
||||
tscInfo("set cfg:%s to %s", pItem->name, str);
|
||||
if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
|
||||
code = taosSetCfg(pCfg, pItem->name);
|
||||
code = taosApplyLocalCfg(pCfg, pItem->name);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -117,6 +117,7 @@ int32_t tsRedirectFactor = 2;
|
|||
int32_t tsRedirectMaxPeriod = 1000;
|
||||
int32_t tsMaxRetryWaitTime = 10000;
|
||||
bool tsUseAdapter = false;
|
||||
int32_t tsMetaCacheMaxSize = -1; // MB
|
||||
|
||||
|
||||
/*
|
||||
|
@ -345,6 +346,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
|
||||
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, 1) != 0) return -1;
|
||||
|
||||
tsNumOfRpcThreads = tsNumOfCores / 2;
|
||||
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
|
||||
|
@ -742,6 +744,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval;
|
||||
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
|
||||
tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64;
|
||||
tsMetaCacheMaxSize = cfgGetItem(pCfg, "metaCacheMaxSize")->i32;
|
||||
|
||||
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
|
||||
|
||||
|
@ -864,7 +867,7 @@ void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) {
|
|||
*forbidden = false;
|
||||
}
|
||||
|
||||
int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
||||
int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
|
||||
int32_t len = strlen(name);
|
||||
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
|
||||
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
|
||||
|
@ -999,6 +1002,12 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case 'e': {
|
||||
if (strcasecmp("metaCacheMaxSize", name) == 0) {
|
||||
atomic_store_32(&tsMetaCacheMaxSize, cfgGetItem(pCfg, "metaCacheMaxSize")->i32);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'i': {
|
||||
if (strcasecmp("minimalTmpDirGB", name) == 0) {
|
||||
tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024);
|
||||
|
|
|
@ -3302,6 +3302,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
|
|||
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pRsp->suid) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->indexSize) < 0) return -1;
|
||||
int32_t num = taosArrayGetSize(pRsp->pIndex);
|
||||
if (tEncodeI32(&encoder, num) < 0) return -1;
|
||||
if (num > 0) {
|
||||
|
@ -3347,6 +3348,7 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
|
|||
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pRsp->suid) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->indexSize) < 0) return -1;
|
||||
int32_t num = 0;
|
||||
if (tDecodeI32(&decoder, &num) < 0) return -1;
|
||||
if (num > 0) {
|
||||
|
@ -3621,6 +3623,7 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
|
|||
if (tEncodeCStr(&encoder, pIndexRsp->dbFName) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pIndexRsp->suid) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pIndexRsp->version) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pIndexRsp->indexSize) < 0) return -1;
|
||||
int32_t num = taosArrayGetSize(pIndexRsp->pIndex);
|
||||
if (tEncodeI32(&encoder, num) < 0) return -1;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
|
@ -3683,6 +3686,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
|
|||
if (tDecodeCStrTo(&decoder, tableIndexRsp.dbFName) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &tableIndexRsp.suid) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &tableIndexRsp.version) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &tableIndexRsp.indexSize) < 0) return -1;
|
||||
int32_t num = 0;
|
||||
if (tDecodeI32(&decoder, &num) < 0) return -1;
|
||||
if (num > 0) {
|
||||
|
|
|
@ -1114,6 +1114,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
|
|||
return code;
|
||||
}
|
||||
|
||||
rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
|
||||
*exist = true;
|
||||
|
||||
sdbRelease(pSdb, pSma);
|
||||
|
|
|
@ -23,6 +23,8 @@ extern "C" {
|
|||
#include "catalog.h"
|
||||
#include "query.h"
|
||||
#include "tcommon.h"
|
||||
#include "ttimer.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
|
||||
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
|
||||
|
@ -34,6 +36,8 @@ extern "C" {
|
|||
#define CTG_DEFAULT_BATCH_NUM 64
|
||||
#define CTG_DEFAULT_FETCH_NUM 8
|
||||
#define CTG_MAX_COMMAND_LEN 512
|
||||
#define CTG_DEFAULT_CACHE_MON_MSEC 5000
|
||||
#define CTG_CLEAR_CACHE_ROUND_TB_NUM 3000
|
||||
|
||||
#define CTG_RENT_SLOT_SECOND 1.5
|
||||
|
||||
|
@ -227,8 +231,8 @@ typedef STableIndexRsp STableIndex;
|
|||
|
||||
typedef struct SCtgTbCache {
|
||||
SRWLatch metaLock;
|
||||
STableMeta* pMeta;
|
||||
SRWLatch indexLock;
|
||||
STableMeta* pMeta;
|
||||
STableIndex* pIndex;
|
||||
} SCtgTbCache;
|
||||
|
||||
|
@ -245,6 +249,7 @@ typedef struct SCtgDBCache {
|
|||
SHashObj* tbCache; // key:tbname, value:SCtgTbCache
|
||||
SHashObj* stbCache; // key:suid, value:char*
|
||||
uint64_t dbCacheNum[CTG_CI_MAX_VALUE];
|
||||
uint64_t dbCacheSize;
|
||||
} SCtgDBCache;
|
||||
|
||||
typedef struct SCtgRentSlot {
|
||||
|
@ -258,12 +263,15 @@ typedef struct SCtgRentMgmt {
|
|||
uint16_t slotNum;
|
||||
uint16_t slotRIdx;
|
||||
int64_t lastReadMsec;
|
||||
uint64_t rentCacheSize;
|
||||
int32_t metaSize;
|
||||
SCtgRentSlot* slots;
|
||||
} SCtgRentMgmt;
|
||||
|
||||
typedef struct SCtgUserAuth {
|
||||
SRWLatch lock;
|
||||
SGetUserAuthRsp userAuth;
|
||||
uint64_t userCacheSize;
|
||||
} SCtgUserAuth;
|
||||
|
||||
typedef struct SCatalog {
|
||||
|
@ -392,6 +400,7 @@ typedef struct SCtgRuntimeStat {
|
|||
uint64_t numOfOpAbort;
|
||||
uint64_t numOfOpEnqueue;
|
||||
uint64_t numOfOpDequeue;
|
||||
uint64_t numOfOpClearMeta;
|
||||
uint64_t numOfOpClearCache;
|
||||
} SCtgRuntimeStat;
|
||||
|
||||
|
@ -461,6 +470,7 @@ typedef struct SCtgDropTbIndexMsg {
|
|||
|
||||
typedef struct SCtgClearCacheMsg {
|
||||
SCatalog* pCtg;
|
||||
bool clearMeta;
|
||||
bool freeCtg;
|
||||
} SCtgClearCacheMsg;
|
||||
|
||||
|
@ -499,6 +509,8 @@ typedef struct SCatalogMgmt {
|
|||
int32_t jobPool;
|
||||
SRWLatch lock;
|
||||
SCtgQueue queue;
|
||||
void *timer;
|
||||
tmr_h cacheTimer;
|
||||
TdThread updateThread;
|
||||
SHashObj* pCluster; // key: clusterId, value: SCatalog*
|
||||
SCatalogStat statInfo;
|
||||
|
@ -515,9 +527,8 @@ typedef struct SCtgOperation {
|
|||
} SCtgOperation;
|
||||
|
||||
typedef struct SCtgCacheItemInfo {
|
||||
char *name;
|
||||
char* name;
|
||||
int32_t flag;
|
||||
uint32_t persistSize;
|
||||
} SCtgCacheItemInfo;
|
||||
|
||||
#define CTG_AUTH_READ(_t) ((_t) == AUTH_TYPE_READ || (_t) == AUTH_TYPE_READ_OR_WRITE)
|
||||
|
@ -530,11 +541,6 @@ typedef struct SCtgCacheItemInfo {
|
|||
#define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
|
||||
#define CTG_STAT_GET(_item) atomic_load_64(&(_item))
|
||||
|
||||
#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1
|
||||
#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1
|
||||
#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1
|
||||
#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0
|
||||
|
||||
#define CTG_STAT_API_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.api.item, n))
|
||||
#define CTG_STAT_RT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.runtime.item, n))
|
||||
#define CTG_STAT_NUM_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.cache.cacheNum[item], n))
|
||||
|
@ -549,6 +555,11 @@ typedef struct SCtgCacheItemInfo {
|
|||
#define CTG_CACHE_HIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheHit[item], n))
|
||||
#define CTG_CACHE_NHIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheNHit[item], n))
|
||||
|
||||
#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1
|
||||
#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1
|
||||
#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1
|
||||
#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0
|
||||
|
||||
#define CTG_META_NUM_INC(type) \
|
||||
do { \
|
||||
switch (type) { \
|
||||
|
@ -658,6 +669,10 @@ typedef struct SCtgCacheItemInfo {
|
|||
#define CTG_DB_NOT_EXIST(code) \
|
||||
(code == TSDB_CODE_MND_DB_NOT_EXIST || code == TSDB_CODE_MND_DB_IN_CREATING || code == TSDB_CODE_MND_DB_IN_DROPPING)
|
||||
|
||||
#define CTG_CACHE_OVERFLOW(_csize, _maxsize) ((_maxsize >= 0) ? ((_csize) >= (_maxsize) * 1048576L * 0.9) : false)
|
||||
#define CTG_CACHE_LOW(_csize, _maxsize) ((_maxsize >= 0) ? ((_csize) <= (_maxsize) * 1048576L * 0.75) : true)
|
||||
|
||||
|
||||
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__)
|
||||
#define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__)
|
||||
#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCtg, __VA_ARGS__)
|
||||
|
@ -760,6 +775,12 @@ typedef struct SCtgCacheItemInfo {
|
|||
CTG_RET(__code); \
|
||||
} while (0)
|
||||
|
||||
#define CTG_API_NLEAVE() \
|
||||
do { \
|
||||
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); \
|
||||
CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \
|
||||
} while (0)
|
||||
|
||||
#define CTG_API_ENTER() \
|
||||
do { \
|
||||
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
|
||||
|
@ -769,6 +790,15 @@ typedef struct SCtgCacheItemInfo {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define CTG_API_NENTER() \
|
||||
do { \
|
||||
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
|
||||
CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \
|
||||
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \
|
||||
CTG_API_NLEAVE(); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define CTG_API_JENTER() \
|
||||
do { \
|
||||
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
|
||||
|
@ -829,8 +859,8 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput* output, bool sy
|
|||
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp* pAuth, bool syncReq);
|
||||
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char* dbFName, int32_t vgId, SEpSet* pEpSet);
|
||||
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex** pIndex, bool syncOp);
|
||||
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp);
|
||||
int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type);
|
||||
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp);
|
||||
int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type, int32_t size);
|
||||
int32_t ctgMetaRentAdd(SCtgRentMgmt* mgmt, void* meta, int64_t id, int32_t size);
|
||||
int32_t ctgMetaRentGet(SCtgRentMgmt* mgmt, void** res, uint32_t* num, int32_t size);
|
||||
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
|
||||
|
@ -911,7 +941,7 @@ void ctgFreeSTableIndex(void* info);
|
|||
void ctgClearSubTaskRes(SCtgSubRes* pRes);
|
||||
void ctgFreeQNode(SCtgQNode* node);
|
||||
void ctgClearHandle(SCatalog* pCtg);
|
||||
void ctgFreeTbCacheImpl(SCtgTbCache* pCache);
|
||||
void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock);
|
||||
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
|
||||
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup,
|
||||
bool* exists);
|
||||
|
@ -927,6 +957,16 @@ void ctgReleaseVgMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCach
|
|||
void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCache* pCache);
|
||||
void ctgGetGlobalCacheStat(SCtgCacheStat* pStat);
|
||||
int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res);
|
||||
void ctgGetGlobalCacheSize(uint64_t *pSize);
|
||||
uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex);
|
||||
uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta);
|
||||
uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg);
|
||||
uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth);
|
||||
uint64_t ctgGetClusterCacheSize(SCatalog *pCtg);
|
||||
void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone);
|
||||
void ctgClearAllHandleMeta(bool *roundDone);
|
||||
void ctgProcessTimerEvent(void *param, void *tmrId);
|
||||
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
extern SCtgDebug gCTGDebug;
|
||||
|
|
|
@ -626,6 +626,36 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
void ctgProcessTimerEvent(void *param, void *tmrId) {
|
||||
CTG_API_NENTER();
|
||||
|
||||
int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize);
|
||||
if (cacheMaxSize >= 0) {
|
||||
uint64_t cacheSize = 0;
|
||||
ctgGetGlobalCacheSize(&cacheSize);
|
||||
bool overflow = CTG_CACHE_OVERFLOW(cacheSize, cacheMaxSize);
|
||||
|
||||
qDebug("catalog cache size: %" PRIu64"B, maxCaseSize:%dMB, %s", cacheSize, cacheMaxSize, overflow ? "overflow" : "NO overflow");
|
||||
|
||||
if (overflow) {
|
||||
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
|
||||
if (code) {
|
||||
qError("clear cache enqueue failed, error:%s", tstrerror(code));
|
||||
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||
}
|
||||
|
||||
goto _return;
|
||||
}
|
||||
}
|
||||
|
||||
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||
|
||||
_return:
|
||||
|
||||
CTG_API_NLEAVE();
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogInit(SCatalogCfg* cfg) {
|
||||
if (gCtgMgmt.pCluster) {
|
||||
qError("catalog already initialized");
|
||||
|
@ -684,6 +714,18 @@ int32_t catalogInit(SCatalogCfg* cfg) {
|
|||
CTG_ERR_RET(terrno);
|
||||
}
|
||||
|
||||
gCtgMgmt.timer = taosTmrInit(0, 0, 0, "catalog");
|
||||
if (NULL == gCtgMgmt.timer) {
|
||||
qError("init timer failed, error:%s", tstrerror(terrno));
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
gCtgMgmt.cacheTimer = taosTmrStart(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer);
|
||||
if (NULL == gCtgMgmt.cacheTimer) {
|
||||
qError("start cache timer failed");
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgStartUpdateThread());
|
||||
|
||||
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum,
|
||||
|
@ -727,8 +769,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
|
|||
|
||||
clusterCtg->clusterId = clusterId;
|
||||
|
||||
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
|
||||
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
|
||||
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbVgVersion)));
|
||||
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion)));
|
||||
|
||||
clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
|
||||
false, HASH_ENTRY_LOCK);
|
||||
|
@ -1511,11 +1553,11 @@ int32_t catalogClearCache(void) {
|
|||
|
||||
qInfo("start to clear catalog cache");
|
||||
|
||||
if (NULL == gCtgMgmt.pCluster || atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
|
||||
if (NULL == gCtgMgmt.pCluster) {
|
||||
CTG_API_LEAVE_NOLOCK(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t code = ctgClearCacheEnqueue(NULL, false, false, true);
|
||||
int32_t code = ctgClearCacheEnqueue(NULL, false, false, false, true);
|
||||
|
||||
qInfo("clear catalog cache end, code: %s", tstrerror(code));
|
||||
|
||||
|
@ -1532,7 +1574,7 @@ void catalogDestroy(void) {
|
|||
atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
|
||||
|
||||
if (!taosCheckCurrentInDll()) {
|
||||
ctgClearCacheEnqueue(NULL, true, true, true);
|
||||
ctgClearCacheEnqueue(NULL, false, true, true, true);
|
||||
taosThreadJoin(gCtgMgmt.updateThread, NULL);
|
||||
}
|
||||
|
||||
|
|
|
@ -32,14 +32,14 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v
|
|||
{CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
|
||||
|
||||
SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = {
|
||||
{"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL, sizeof(SCatalog)}, //CTG_CI_CLUSTER
|
||||
{"Dnode ", CTG_CI_FLAG_LEVEL_CLUSTER, 0}, //CTG_CI_DNODE,
|
||||
{"Qnode ", CTG_CI_FLAG_LEVEL_CLUSTER, 0}, //CTG_CI_QNODE,
|
||||
{"DB ", CTG_CI_FLAG_LEVEL_CLUSTER, sizeof(SCtgDBCache)}, //CTG_CI_DB,
|
||||
{"DbVgroup ", CTG_CI_FLAG_LEVEL_DB, sizeof(SDBVgInfo)}, //CTG_CI_DB_VGROUP,
|
||||
{"DbCfg ", CTG_CI_FLAG_LEVEL_DB, 0}, //CTG_CI_DB_CFG,
|
||||
{"DbInfo ", CTG_CI_FLAG_LEVEL_DB, 0}, //CTG_CI_DB_INFO,
|
||||
{"StbMeta ", CTG_CI_FLAG_LEVEL_DB, }, //CTG_CI_STABLE_META,
|
||||
{"Cluster ", CTG_CI_FLAG_LEVEL_GLOBAL}, //CTG_CI_CLUSTER
|
||||
{"Dnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DNODE,
|
||||
{"Qnode ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_QNODE,
|
||||
{"DB ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DB,
|
||||
{"DbVgroup ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_VGROUP,
|
||||
{"DbCfg ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_CFG,
|
||||
{"DbInfo ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_DB_INFO,
|
||||
{"StbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_STABLE_META,
|
||||
{"NtbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_NTABLE_META,
|
||||
{"CtbMeta ", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_CTABLE_META,
|
||||
{"SysTblMeta", CTG_CI_FLAG_LEVEL_DB}, //CTG_CI_SYSTABLE_META,
|
||||
|
@ -1135,7 +1135,7 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
|
||||
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp) {
|
||||
int32_t code = 0;
|
||||
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
|
||||
op->opId = CTG_OP_CLEAR_CACHE;
|
||||
|
@ -1151,6 +1151,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
|
|||
}
|
||||
|
||||
msg->pCtg = pCtg;
|
||||
msg->clearMeta = clearMeta;
|
||||
msg->freeCtg = freeCtg;
|
||||
op->data = msg;
|
||||
|
||||
|
@ -1163,10 +1164,11 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
|
||||
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type, int32_t size) {
|
||||
mgmt->slotRIdx = 0;
|
||||
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
|
||||
mgmt->type = type;
|
||||
mgmt->metaSize = size;
|
||||
|
||||
size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
|
||||
|
||||
|
@ -1176,6 +1178,8 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
mgmt->rentCacheSize = msgSize;
|
||||
|
||||
qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1202,6 +1206,7 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size)
|
|||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
mgmt->rentCacheSize += size;
|
||||
slot->needSort = true;
|
||||
|
||||
qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
|
||||
|
@ -1282,6 +1287,7 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
|
|||
}
|
||||
|
||||
taosArrayRemove(slot->meta, idx);
|
||||
mgmt->rentCacheSize -= mgmt->metaSize;
|
||||
|
||||
qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
|
||||
|
||||
|
@ -1549,10 +1555,15 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
|
|||
}
|
||||
|
||||
if (origType == TSDB_SUPER_TABLE) {
|
||||
char *stbName = taosHashGet(dbCache->stbCache, &orig->suid, sizeof(orig->suid));
|
||||
if (stbName) {
|
||||
uint64_t metaSize = strlen(stbName) + 1 + sizeof(orig->suid);
|
||||
if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
|
||||
ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
|
||||
} else {
|
||||
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
|
||||
atomic_sub_fetch_64(&dbCache->dbCacheSize, metaSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1566,14 +1577,20 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(meta));
|
||||
|
||||
pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
|
||||
} else {
|
||||
CTG_LOCK(CTG_WRITE, &pCache->metaLock);
|
||||
if (orig) {
|
||||
CTG_META_NUM_DEC(origType);
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(meta) - ctgGetTbMetaCacheSize(pCache->pMeta));
|
||||
|
||||
taosMemoryFree(pCache->pMeta);
|
||||
pCache->pMeta = meta;
|
||||
|
||||
CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
|
||||
}
|
||||
|
||||
|
@ -1591,6 +1608,8 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&dbCache->dbCacheSize, sizeof(meta->suid) + strlen(tbName) + 1);
|
||||
|
||||
ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
|
||||
meta->tableType);
|
||||
|
||||
|
@ -1623,6 +1642,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(SCtgTbCache) + ctgGetTbIndexCacheSize(pIndex));
|
||||
|
||||
CTG_DB_NUM_INC(CTG_CI_TBL_SMA);
|
||||
|
||||
*index = NULL;
|
||||
|
@ -1639,6 +1660,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
|
|||
CTG_LOCK(CTG_WRITE, &pCache->indexLock);
|
||||
|
||||
if (pCache->pIndex) {
|
||||
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pCache->pIndex));
|
||||
if (0 == suid) {
|
||||
suid = pCache->pIndex->suid;
|
||||
}
|
||||
|
@ -1649,6 +1671,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
|
|||
pCache->pIndex = pIndex;
|
||||
CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);
|
||||
|
||||
atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pIndex));
|
||||
|
||||
*index = NULL;
|
||||
|
||||
ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
|
||||
|
@ -1678,7 +1702,7 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
void ctgClearAllInstance(void) {
|
||||
void ctgClearAllHandles(void) {
|
||||
SCatalog *pCtg = NULL;
|
||||
|
||||
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
|
||||
|
@ -1693,7 +1717,7 @@ void ctgClearAllInstance(void) {
|
|||
}
|
||||
}
|
||||
|
||||
void ctgFreeAllInstance(void) {
|
||||
void ctgFreeAllHandles(void) {
|
||||
SCatalog *pCtg = NULL;
|
||||
|
||||
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
|
||||
|
@ -1752,6 +1776,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
|||
}
|
||||
|
||||
SCtgVgCache *vgCache = &dbCache->vgCache;
|
||||
|
||||
CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
|
||||
|
||||
if (vgCache->vgInfo) {
|
||||
|
@ -1774,6 +1799,8 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(vgCache->vgInfo));
|
||||
|
||||
freeVgInfo(vgInfo);
|
||||
CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP);
|
||||
}
|
||||
|
@ -1787,6 +1814,8 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
|||
|
||||
ctgWUnlockVgInfo(dbCache);
|
||||
|
||||
atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(vgCache->vgInfo));
|
||||
|
||||
dbCache = NULL;
|
||||
|
||||
// if (!IS_SYS_DBNAME(dbFName)) {
|
||||
|
@ -1850,6 +1879,8 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
|
|||
|
||||
CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
|
||||
|
||||
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetDbVgroupCacheSize(dbCache->vgCache.vgInfo));
|
||||
|
||||
freeVgInfo(dbCache->vgCache.vgInfo);
|
||||
dbCache->vgCache.vgInfo = NULL;
|
||||
|
||||
|
@ -1941,9 +1972,15 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
char *stbName = taosHashGet(dbCache->stbCache, &msg->suid, sizeof(msg->suid));
|
||||
if (stbName) {
|
||||
uint64_t metaSize = strlen(stbName) + 1 + sizeof(msg->suid);
|
||||
if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
|
||||
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
|
||||
msg->stbName, msg->suid);
|
||||
} else {
|
||||
atomic_sub_fetch_64(&dbCache->dbCacheSize, metaSize);
|
||||
}
|
||||
}
|
||||
|
||||
SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
|
||||
|
@ -1952,15 +1989,15 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
|
||||
tblType = pTbCache->pMeta->tableType;
|
||||
ctgFreeTbCacheImpl(pTbCache);
|
||||
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
|
||||
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(pTbCache->pMeta) + ctgGetTbIndexCacheSize(pTbCache->pIndex));
|
||||
ctgFreeTbCacheImpl(pTbCache, true);
|
||||
|
||||
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
|
||||
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
|
||||
} else {
|
||||
CTG_META_NUM_DEC(tblType);
|
||||
atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(*pTbCache) + strlen(msg->stbName));
|
||||
}
|
||||
|
||||
ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
|
||||
|
@ -2004,15 +2041,15 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
|
||||
tblType = pTbCache->pMeta->tableType;
|
||||
ctgFreeTbCacheImpl(pTbCache);
|
||||
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
|
||||
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbMetaCacheSize(pTbCache->pMeta) + ctgGetTbIndexCacheSize(pTbCache->pIndex));
|
||||
ctgFreeTbCacheImpl(pTbCache, true);
|
||||
|
||||
if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
|
||||
ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
} else {
|
||||
atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(*pTbCache) + strlen(msg->tbName));
|
||||
CTG_META_NUM_DEC(tblType);
|
||||
}
|
||||
|
||||
|
@ -2039,6 +2076,7 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
|
|||
SCtgUserAuth userAuth = {0};
|
||||
|
||||
memcpy(&userAuth.userAuth, &msg->userAuth, sizeof(msg->userAuth));
|
||||
userAuth.userCacheSize = ctgGetUserCacheSize(&userAuth.userAuth);
|
||||
|
||||
if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
|
||||
ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
|
||||
|
@ -2080,6 +2118,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
|
|||
|
||||
CTG_UNLOCK(CTG_WRITE, &pUser->lock);
|
||||
|
||||
atomic_store_64(&pUser->userCacheSize, ctgGetUserCacheSize(&pUser->userAuth));
|
||||
|
||||
_return:
|
||||
|
||||
taosHashCleanup(msg->userAuth.createdDbs);
|
||||
|
@ -2214,8 +2254,7 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
|
||||
int32_t code = 0;
|
||||
void ctgClearFreeCache(SCtgCacheOperation *operation) {
|
||||
SCtgClearCacheMsg *msg = operation->data;
|
||||
SCatalog *pCtg = msg->pCtg;
|
||||
|
||||
|
@ -2227,20 +2266,65 @@ int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
|
|||
} else {
|
||||
ctgClearHandle(pCtg);
|
||||
}
|
||||
|
||||
goto _return;
|
||||
} else if (msg->freeCtg) {
|
||||
ctgFreeAllHandles();
|
||||
} else {
|
||||
ctgClearAllHandles();
|
||||
}
|
||||
|
||||
if (msg->freeCtg) {
|
||||
ctgFreeAllInstance();
|
||||
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
|
||||
}
|
||||
|
||||
void ctgClearMetaCache(SCtgCacheOperation *operation) {
|
||||
SCtgClearCacheMsg *msg = operation->data;
|
||||
SCatalog *pCtg = msg->pCtg;
|
||||
int64_t clearedSize = 0;
|
||||
int64_t clearedNum = 0;
|
||||
int64_t remainSize = 0;
|
||||
bool roundDone = false;
|
||||
|
||||
if (pCtg) {
|
||||
ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, &roundDone);
|
||||
} else {
|
||||
ctgClearAllInstance();
|
||||
ctgClearAllHandleMeta(&roundDone);
|
||||
}
|
||||
|
||||
qDebug("catalog finish one round meta clear, done:%d", roundDone);
|
||||
|
||||
ctgGetGlobalCacheSize(&remainSize);
|
||||
int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize);
|
||||
|
||||
if (!CTG_CACHE_LOW(remainSize, cacheMaxSize)) {
|
||||
qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize);
|
||||
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!roundDone) {
|
||||
ctgClearFreeCache(operation);
|
||||
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
|
||||
if (code) {
|
||||
qError("clear cache enqueue failed, error:%s", tstrerror(code));
|
||||
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
|
||||
int32_t code = 0;
|
||||
SCtgClearCacheMsg *msg = operation->data;
|
||||
|
||||
if (msg->clearMeta) {
|
||||
ctgClearMetaCache(operation);
|
||||
} else {
|
||||
ctgClearFreeCache(operation);
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "trpc.h"
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
SCtgDebug gCTGDebug = {0};
|
||||
SCtgDebug gCTGDebug = {.statEnable = true};
|
||||
|
||||
#if 0
|
||||
|
||||
|
@ -528,7 +528,10 @@ int32_t ctgdShowStatInfo(void) {
|
|||
CTG_API_ENTER();
|
||||
|
||||
SCtgCacheStat cache;
|
||||
uint64_t cacheSize = 0;
|
||||
|
||||
ctgGetGlobalCacheStat(&cache);
|
||||
ctgGetGlobalCacheSize(&cacheSize);
|
||||
|
||||
qDebug("## Global Stat Info %s ##", "begin");
|
||||
qDebug("## \t%s \t%s \t%s ##", "Num", "Hit", "Nhit");
|
||||
|
@ -536,6 +539,7 @@ int32_t ctgdShowStatInfo(void) {
|
|||
qDebug("# %s \t%" PRIu64 " \t%" PRIu64 " \t%" PRIu64 " #", gCtgStatItem[i].name, cache.cacheNum[i], cache.cacheHit[i], cache.cacheNHit[i]);
|
||||
}
|
||||
qDebug("## Global Stat Info %s ##", "end");
|
||||
qDebug("## Global Cache Size: %" PRIu64, cacheSize);
|
||||
|
||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
|
|
@ -196,6 +196,7 @@ void ctgFreeMetaRent(SCtgRentMgmt* mgmt) {
|
|||
}
|
||||
|
||||
taosMemoryFreeClear(mgmt->slots);
|
||||
mgmt->rentCacheSize = 0;
|
||||
}
|
||||
|
||||
void ctgFreeStbMetaCache(SCtgDBCache* dbCache) {
|
||||
|
@ -208,12 +209,26 @@ void ctgFreeStbMetaCache(SCtgDBCache* dbCache) {
|
|||
dbCache->stbCache = NULL;
|
||||
}
|
||||
|
||||
void ctgFreeTbCacheImpl(SCtgTbCache* pCache) {
|
||||
qDebug("tbMeta freed, p:%p", pCache->pMeta);
|
||||
void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock) {
|
||||
if (pCache->pMeta) {
|
||||
if (lock) {
|
||||
CTG_LOCK(CTG_WRITE, &pCache->metaLock);
|
||||
}
|
||||
taosMemoryFreeClear(pCache->pMeta);
|
||||
if (lock) {
|
||||
CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
|
||||
}
|
||||
}
|
||||
|
||||
if (pCache->pIndex) {
|
||||
if (lock) {
|
||||
CTG_LOCK(CTG_WRITE, &pCache->indexLock);
|
||||
}
|
||||
taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
|
||||
taosMemoryFreeClear(pCache->pIndex);
|
||||
if (lock) {
|
||||
CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -225,7 +240,7 @@ void ctgFreeTbCache(SCtgDBCache* dbCache) {
|
|||
int32_t tblNum = taosHashGetSize(dbCache->tbCache);
|
||||
SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL);
|
||||
while (NULL != pCache) {
|
||||
ctgFreeTbCacheImpl(pCache);
|
||||
ctgFreeTbCacheImpl(pCache, false);
|
||||
pCache = taosHashIterate(dbCache->tbCache, pCache);
|
||||
}
|
||||
taosHashCleanup(dbCache->tbCache);
|
||||
|
@ -311,6 +326,67 @@ void ctgFreeHandle(SCatalog* pCtg) {
|
|||
ctgInfo("handle freed, clusterId:0x%" PRIx64, clusterId);
|
||||
}
|
||||
|
||||
void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone) {
|
||||
int64_t cacheSize = 0;
|
||||
void* pIter = taosHashIterate(pCtg->dbCache, NULL);
|
||||
while (pIter) {
|
||||
SCtgDBCache* dbCache = pIter;
|
||||
|
||||
SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL);
|
||||
while (NULL != pCache) {
|
||||
size_t len = 0;
|
||||
void* key = taosHashGetKey(pCache, &len);
|
||||
|
||||
if (pCache->pMeta && TSDB_SUPER_TABLE == pCache->pMeta->tableType) {
|
||||
continue;
|
||||
}
|
||||
|
||||
taosHashRemove(dbCache->tbCache, key, len);
|
||||
cacheSize = len + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(pCache->pMeta) + ctgGetTbIndexCacheSize(pCache->pIndex);
|
||||
atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize);
|
||||
*pClearedSize += cacheSize;
|
||||
(*pCleardNum)++;
|
||||
|
||||
ctgFreeTbCacheImpl(pCache, true);
|
||||
|
||||
if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
pCache = taosHashIterate(dbCache->tbCache, pCache);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pCtg->dbCache, pIter);
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) {
|
||||
*roundDone = true;
|
||||
}
|
||||
}
|
||||
|
||||
void ctgClearAllHandleMeta(bool *roundDone) {
|
||||
int64_t clearedSize = 0;
|
||||
int64_t clearedNum = 0;
|
||||
SCatalog *pCtg = NULL;
|
||||
|
||||
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
|
||||
while (pIter) {
|
||||
pCtg = *(SCatalog **)pIter;
|
||||
|
||||
if (pCtg) {
|
||||
ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, roundDone);
|
||||
if (*roundDone) {
|
||||
taosHashCancelIterate(gCtgMgmt.pCluster, pIter);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
void ctgClearHandle(SCatalog* pCtg) {
|
||||
if (NULL == pCtg) {
|
||||
return;
|
||||
|
@ -324,8 +400,8 @@ void ctgClearHandle(SCatalog* pCtg) {
|
|||
ctgFreeInstDbCache(pCtg->dbCache);
|
||||
ctgFreeInstUserCache(pCtg->userCache);
|
||||
|
||||
ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB);
|
||||
ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE);
|
||||
ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbVgVersion));
|
||||
ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion));
|
||||
|
||||
pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false,
|
||||
HASH_ENTRY_LOCK);
|
||||
|
@ -1562,6 +1638,130 @@ void catalogFreeMetaData(SMetaData* pData) {
|
|||
}
|
||||
#endif
|
||||
|
||||
uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex) {
|
||||
if (NULL == pIndex) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return sizeof(*pIndex) + pIndex->indexSize;
|
||||
}
|
||||
|
||||
FORCE_INLINE uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta) {
|
||||
if (NULL == pMeta) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
switch (pMeta->tableType) {
|
||||
case TSDB_SUPER_TABLE:
|
||||
return sizeof(*pMeta) + (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags) * sizeof(SSchema);
|
||||
case TSDB_CHILD_TABLE:
|
||||
return sizeof(SCTableMeta);
|
||||
default:
|
||||
return sizeof(*pMeta) + pMeta->tableInfo.numOfColumns * sizeof(SSchema);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg) {
|
||||
if (NULL == pVg) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return sizeof(*pVg) + taosHashGetSize(pVg->vgHash) * (sizeof(SVgroupInfo) + sizeof(int32_t))
|
||||
+ taosArrayGetSize(pVg->vgArray) * sizeof(SVgroupInfo);
|
||||
}
|
||||
|
||||
uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth) {
|
||||
if (NULL == pAuth) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint64_t cacheSize = 0;
|
||||
char* p = taosHashIterate(pAuth->createdDbs, NULL);
|
||||
while (p != NULL) {
|
||||
size_t len = 0;
|
||||
void* key = taosHashGetKey(p, &len);
|
||||
cacheSize += len + strlen(p);
|
||||
|
||||
p = taosHashIterate(pAuth->createdDbs, p);
|
||||
}
|
||||
|
||||
p = taosHashIterate(pAuth->readDbs, NULL);
|
||||
while (p != NULL) {
|
||||
size_t len = 0;
|
||||
void* key = taosHashGetKey(p, &len);
|
||||
cacheSize += len + strlen(p);
|
||||
|
||||
p = taosHashIterate(pAuth->readDbs, p);
|
||||
}
|
||||
|
||||
p = taosHashIterate(pAuth->writeDbs, NULL);
|
||||
while (p != NULL) {
|
||||
size_t len = 0;
|
||||
void* key = taosHashGetKey(p, &len);
|
||||
cacheSize += len + strlen(p);
|
||||
|
||||
p = taosHashIterate(pAuth->writeDbs, p);
|
||||
}
|
||||
|
||||
p = taosHashIterate(pAuth->readTbs, NULL);
|
||||
while (p != NULL) {
|
||||
size_t len = 0;
|
||||
void* key = taosHashGetKey(p, &len);
|
||||
cacheSize += len + strlen(p);
|
||||
|
||||
p = taosHashIterate(pAuth->readTbs, p);
|
||||
}
|
||||
|
||||
p = taosHashIterate(pAuth->writeTbs, NULL);
|
||||
while (p != NULL) {
|
||||
size_t len = 0;
|
||||
void* key = taosHashGetKey(p, &len);
|
||||
cacheSize += len + strlen(p);
|
||||
|
||||
p = taosHashIterate(pAuth->writeTbs, p);
|
||||
}
|
||||
|
||||
int32_t *ref = taosHashIterate(pAuth->useDbs, NULL);
|
||||
while (ref != NULL) {
|
||||
size_t len = 0;
|
||||
void* key = taosHashGetKey(ref, &len);
|
||||
cacheSize += len + sizeof(*ref);
|
||||
|
||||
ref = taosHashIterate(pAuth->useDbs, ref);
|
||||
}
|
||||
|
||||
return cacheSize;
|
||||
}
|
||||
|
||||
uint64_t ctgGetClusterCacheSize(SCatalog *pCtg) {
|
||||
uint64_t cacheSize = sizeof(SCatalog);
|
||||
|
||||
SCtgUserAuth* pAuth = taosHashIterate(pCtg->userCache, NULL);
|
||||
while (pAuth != NULL) {
|
||||
size_t len = 0;
|
||||
void* key = taosHashGetKey(pAuth, &len);
|
||||
cacheSize += len + sizeof(SCtgUserAuth) + atomic_load_64(&pAuth->userCacheSize);
|
||||
|
||||
pAuth = taosHashIterate(pCtg->userCache, pAuth);
|
||||
}
|
||||
|
||||
SCtgDBCache* pDb = taosHashIterate(pCtg->dbCache, NULL);
|
||||
while (pDb != NULL) {
|
||||
size_t len = 0;
|
||||
void* key = taosHashGetKey(pDb, &len);
|
||||
cacheSize += len + sizeof(SCtgDBCache) + atomic_load_64(&pDb->dbCacheSize);
|
||||
|
||||
pDb = taosHashIterate(pCtg->dbCache, pDb);
|
||||
}
|
||||
|
||||
cacheSize += pCtg->dbRent.rentCacheSize;
|
||||
cacheSize += pCtg->stbRent.rentCacheSize;
|
||||
|
||||
return cacheSize;
|
||||
}
|
||||
|
||||
void ctgGetClusterCacheStat(SCatalog* pCtg) {
|
||||
for (int32_t i = 0; i < CTG_CI_MAX_VALUE; ++i) {
|
||||
if (0 == (gCtgStatItem[i].flag & CTG_CI_FLAG_LEVEL_DB)) {
|
||||
|
@ -1627,10 +1827,22 @@ void ctgGetGlobalCacheStat(SCtgCacheStat* pStat) {
|
|||
memcpy(pStat, &gCtgMgmt.statInfo.cache, sizeof(gCtgMgmt.statInfo.cache));
|
||||
}
|
||||
|
||||
void ctgGetGlobalCacheUsedSize(uint64_t *pSize) {
|
||||
SCtgCacheStat stat;
|
||||
ctgGetGlobalCacheStat(&stat);
|
||||
do;
|
||||
void ctgGetGlobalCacheSize(uint64_t *pSize) {
|
||||
*pSize = 0;
|
||||
|
||||
SCatalog* pCtg = NULL;
|
||||
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
|
||||
while (pIter) {
|
||||
size_t len = 0;
|
||||
void* key = taosHashGetKey(pIter, &len);
|
||||
*pSize += len + POINTER_BYTES;
|
||||
|
||||
pCtg = *(SCatalog**)pIter;
|
||||
if (pCtg) {
|
||||
*pSize += ctgGetClusterCacheSize(pCtg);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -743,7 +743,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
if (taosSetCfg(tsCfg, pStmt->config)) {
|
||||
if (taosApplyLocalCfg(tsCfg, pStmt->config)) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue