feat: support meta cleanup
This commit is contained in:
parent
daedb3618b
commit
9fcf56a4ee
|
@ -210,7 +210,7 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const
|
||||||
|
|
||||||
int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
|
int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
|
||||||
|
|
||||||
int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
|
int32_t catalogAsyncUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg);
|
||||||
|
|
||||||
int32_t catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta);
|
int32_t catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta);
|
||||||
|
|
||||||
|
|
|
@ -217,7 +217,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
catalogUpdateTableMeta(pCatalog, rsp);
|
catalogAsyncUpdateTableMeta(pCatalog, rsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -815,7 +815,7 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
|
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
|
||||||
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
|
return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
||||||
|
|
|
@ -964,7 +964,7 @@ uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg);
|
||||||
uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth);
|
uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth);
|
||||||
uint64_t ctgGetClusterCacheSize(SCatalog *pCtg);
|
uint64_t ctgGetClusterCacheSize(SCatalog *pCtg);
|
||||||
void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone);
|
void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone);
|
||||||
void ctgClearAllHandleMeta(bool *roundDone);
|
void ctgClearAllHandleMeta(int64_t *clearedSize, int64_t *clearedNum, bool *roundDone);
|
||||||
void ctgProcessTimerEvent(void *param, void *tmrId);
|
void ctgProcessTimerEvent(void *param, void *tmrId);
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -648,6 +648,7 @@ void ctgProcessTimerEvent(void *param, void *tmrId) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qTrace("reset catalog timer");
|
||||||
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
@ -1105,6 +1106,22 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t catalogAsyncUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg) {
|
||||||
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
if (NULL == pCtg || NULL == pMsg) {
|
||||||
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
CTG_ERR_JRET(ctgUpdateTbMeta(pCtg, pMsg, false));
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
CTG_API_LEAVE(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* pTables) {
|
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* pTables) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
@ -1571,6 +1588,13 @@ void catalogDestroy(void) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (gCtgMgmt.cacheTimer) {
|
||||||
|
taosTmrStop(gCtgMgmt.cacheTimer);
|
||||||
|
gCtgMgmt.cacheTimer = NULL;
|
||||||
|
taosTmrCleanUp(gCtgMgmt.timer);
|
||||||
|
gCtgMgmt.timer = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
|
atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
|
||||||
|
|
||||||
if (!taosCheckCurrentInDll()) {
|
if (!taosCheckCurrentInDll()) {
|
||||||
|
|
|
@ -791,7 +791,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
|
||||||
|
|
||||||
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
||||||
|
|
||||||
ctgDebug("action [%s] added into queue", opName);
|
ctgDebug("%sync action [%s] added into queue", syncOp ? "S": "As", opName);
|
||||||
|
|
||||||
CTG_QUEUE_INC();
|
CTG_QUEUE_INC();
|
||||||
CTG_STAT_RT_INC(numOfOpEnqueue, 1);
|
CTG_STAT_RT_INC(numOfOpEnqueue, 1);
|
||||||
|
@ -2286,21 +2286,22 @@ void ctgClearMetaCache(SCtgCacheOperation *operation) {
|
||||||
if (pCtg) {
|
if (pCtg) {
|
||||||
ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, &roundDone);
|
ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, &roundDone);
|
||||||
} else {
|
} else {
|
||||||
ctgClearAllHandleMeta(&roundDone);
|
ctgClearAllHandleMeta(&clearedSize, &clearedNum, &roundDone);
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("catalog finish one round meta clear, done:%d", roundDone);
|
qDebug("catalog finish one round meta clear, clearedSize:%" PRId64 ", clearedNum:%" PRId64 ", done:%d", clearedSize, clearedNum, roundDone);
|
||||||
|
|
||||||
ctgGetGlobalCacheSize(&remainSize);
|
ctgGetGlobalCacheSize(&remainSize);
|
||||||
int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize);
|
int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize);
|
||||||
|
|
||||||
if (!CTG_CACHE_LOW(remainSize, cacheMaxSize)) {
|
if (CTG_CACHE_LOW(remainSize, cacheMaxSize)) {
|
||||||
qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize);
|
qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize);
|
||||||
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!roundDone) {
|
if (!roundDone) {
|
||||||
|
qDebug("catalog all meta cleared, remainSize:%" PRId64 ", cacheMaxSize:%dMB, to clear handle", remainSize, cacheMaxSize);
|
||||||
ctgClearFreeCache(operation);
|
ctgClearFreeCache(operation);
|
||||||
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -338,6 +338,7 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardN
|
||||||
void* key = taosHashGetKey(pCache, &len);
|
void* key = taosHashGetKey(pCache, &len);
|
||||||
|
|
||||||
if (pCache->pMeta && TSDB_SUPER_TABLE == pCache->pMeta->tableType) {
|
if (pCache->pMeta && TSDB_SUPER_TABLE == pCache->pMeta->tableType) {
|
||||||
|
pCache = taosHashIterate(dbCache->tbCache, pCache);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -347,9 +348,14 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardN
|
||||||
*pClearedSize += cacheSize;
|
*pClearedSize += cacheSize;
|
||||||
(*pCleardNum)++;
|
(*pCleardNum)++;
|
||||||
|
|
||||||
|
if (pCache->pMeta) {
|
||||||
|
CTG_META_NUM_DEC(pCache->pMeta->tableType);
|
||||||
|
}
|
||||||
|
|
||||||
ctgFreeTbCacheImpl(pCache, true);
|
ctgFreeTbCacheImpl(pCache, true);
|
||||||
|
|
||||||
if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) {
|
if (*pCleardNum >= CTG_CLEAR_CACHE_ROUND_TB_NUM) {
|
||||||
|
taosHashCancelIterate(dbCache->tbCache, pCache);
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -366,9 +372,7 @@ _return:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgClearAllHandleMeta(bool *roundDone) {
|
void ctgClearAllHandleMeta(int64_t *clearedSize, int64_t *clearedNum, bool *roundDone) {
|
||||||
int64_t clearedSize = 0;
|
|
||||||
int64_t clearedNum = 0;
|
|
||||||
SCatalog *pCtg = NULL;
|
SCatalog *pCtg = NULL;
|
||||||
|
|
||||||
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
|
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
|
||||||
|
@ -376,7 +380,7 @@ void ctgClearAllHandleMeta(bool *roundDone) {
|
||||||
pCtg = *(SCatalog **)pIter;
|
pCtg = *(SCatalog **)pIter;
|
||||||
|
|
||||||
if (pCtg) {
|
if (pCtg) {
|
||||||
ctgClearHandleMeta(pCtg, &clearedSize, &clearedNum, roundDone);
|
ctgClearHandleMeta(pCtg, clearedSize, clearedNum, roundDone);
|
||||||
if (*roundDone) {
|
if (*roundDone) {
|
||||||
taosHashCancelIterate(gCtgMgmt.pCluster, pIter);
|
taosHashCancelIterate(gCtgMgmt.pCluster, pIter);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -1746,6 +1746,8 @@ TEST(tableMeta, updateStbMeta) {
|
||||||
|
|
||||||
code = catalogUpdateTableMeta(pCtg, &rsp);
|
code = catalogUpdateTableMeta(pCtg, &rsp);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
code = catalogAsyncUpdateTableMeta(pCtg, &rsp);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
taosMemoryFreeClear(rsp.pSchemas);
|
taosMemoryFreeClear(rsp.pSchemas);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
Loading…
Reference in New Issue