From 4ad36f714dca86136c7137aa02f96aaed9262c91 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 24 Jun 2022 20:53:52 +0800 Subject: [PATCH] fix: fix reset query cache issue --- include/util/taoserror.h | 11 +- source/libs/catalog/inc/catalogInt.h | 55 ++++--- source/libs/catalog/src/catalog.c | 42 +++--- source/libs/catalog/src/ctgAsync.c | 44 +++--- source/libs/catalog/src/ctgCache.c | 167 +++++++++++++--------- source/libs/catalog/src/ctgDbg.c | 6 +- source/libs/catalog/src/ctgUtil.c | 138 +++++++++++------- source/libs/catalog/test/catalogTests.cpp | 18 +-- source/libs/qcom/src/queryUtil.c | 2 +- source/util/src/terror.c | 2 +- 10 files changed, 296 insertions(+), 189 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 44b98d209f..99c64784c0 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -466,12 +466,11 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CTG_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2400) #define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401) #define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) -#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) -#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) -#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405) -#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406) -#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407) -#define TSDB_CODE_CTG_EXIT TAOS_DEF_ERROR_CODE(0, 0x2408) +#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) +#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2404) +#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2405) +#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2406) +#define TSDB_CODE_CTG_EXIT TAOS_DEF_ERROR_CODE(0, 0x2407) //scheduler&qworker #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index f18f73e904..a5768d9003 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -205,6 +205,7 @@ typedef struct SCtgJob { SArray* pTasks; int32_t taskDone; SMetaData jobRes; + int32_t jobResCode; int32_t taskIdx; SRWLatch taskLock; @@ -284,24 +285,26 @@ typedef struct SCtgApiStat { } SCtgApiStat; typedef struct SCtgRuntimeStat { - uint64_t qNum; - uint64_t qDoneNum; + uint64_t numOfOpAbort; + uint64_t numOfOpEnqueue; + uint64_t numOfOpDequeue; } SCtgRuntimeStat; typedef struct SCtgCacheStat { - uint64_t clusterNum; - uint64_t dbNum; - uint64_t tblNum; - uint64_t stblNum; - uint64_t userNum; - uint64_t vgHitNum; - uint64_t vgMissNum; - uint64_t tbMetaHitNum; - uint64_t tbMetaMissNum; - uint64_t tbIndexHitNum; - uint64_t tbIndexMissNum; - uint64_t userHitNum; - uint64_t userMissNum; + uint64_t numOfCluster; + uint64_t numOfDb; + uint64_t numOfTbl; + uint64_t numOfStb; + uint64_t numOfUser; + uint64_t numOfVgHit; + uint64_t numOfVgMiss; + uint64_t numOfMetaHit; + uint64_t numOfMetaMiss; + uint64_t numOfIndexHit; + uint64_t numOfIndexMiss; + uint64_t numOfUserHit; + uint64_t numOfUserMiss; + uint64_t numOfClear; } SCtgCacheStat; typedef struct SCatalogStat { @@ -371,6 +374,7 @@ typedef struct SCtgDropTbIndexMsg { typedef struct SCtgClearCacheMsg { SCatalog* pCtg; + bool freeCtg; } SCtgClearCacheMsg; typedef struct SCtgUpdateEpsetMsg { @@ -530,6 +534,21 @@ typedef struct SCtgOperation { } \ } while (0) + +#define CTG_API_LEAVE_NOLOCK(c) do { \ + int32_t __code = c; \ + CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \ + CTG_RET(__code); \ + } while (0) + +#define CTG_API_ENTER_NOLOCK() do { \ + CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \ + if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \ + CTG_API_LEAVE_NOLOCK(TSDB_CODE_CTG_OUT_OF_SERVICE); \ + } \ +} while (0) + + void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p); void ctgdShowClusterCache(SCatalog* pCtg); int32_t ctgdShowCacheInfo(void); @@ -561,7 +580,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq); int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet); int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp); -int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool stopQueue, bool syncOp); +int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp); int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type); int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size); int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size); @@ -596,7 +615,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask); int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask); -int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum); +int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param); int32_t ctgLaunchJob(SCtgJob *pJob); int32_t ctgMakeAsyncRes(SCtgJob *pJob); int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param); @@ -623,6 +642,8 @@ int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCt int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes); void ctgFreeSTableIndex(void *info); void ctgClearSubTaskRes(SCtgSubRes *pRes); +void ctgFreeQNode(SCtgQNode *node); +void ctgClearHandle(SCatalog* pCtg); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 68f595e753..b459beb658 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -99,7 +99,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* STableMetaOutput* output = taosMemoryCalloc(1, sizeof(STableMetaOutput)); if (NULL == output) { ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } if (CTG_FLAG_IS_SYS_DB(ctx->flag)) { @@ -264,7 +264,7 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) { STableMetaOutput* output = taosMemoryCalloc(1, sizeof(STableMetaOutput)); if (NULL == output) { ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } int32_t code = 0; @@ -442,7 +442,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl vgList = taosArrayInit(1, sizeof(SVgroupInfo)); if (NULL == vgList) { ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo)); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } if (NULL == taosArrayPush(vgList, &vgroupInfo)) { @@ -570,7 +570,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { clusterCtg = taosMemoryCalloc(1, sizeof(SCatalog)); if (NULL == clusterCtg) { qError("calloc %d failed", (int32_t)sizeof(SCatalog)); - CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR); + CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY); } clusterCtg->clusterId = clusterId; @@ -582,7 +582,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { false, HASH_ENTRY_LOCK); if (NULL == clusterCtg->dbCache) { qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + clusterCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == clusterCtg->userCache) { + qError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES); @@ -603,7 +609,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { *catalogHandle = clusterCtg; - CTG_CACHE_STAT_INC(clusterNum, 1); + CTG_CACHE_STAT_INC(numOfCluster, 1); CTG_API_LEAVE(TSDB_CODE_SUCCESS); @@ -991,7 +997,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalo pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES); if (NULL == pRsp->pTableMeta) { ctgError("taosArrayInit %d failed", tbNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } for (int32_t i = 0; i < tbNum; ++i) { @@ -1006,7 +1012,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalo if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) { ctgError("taosArrayPush failed, idx:%d", i); taosMemoryFreeClear(pTableMeta); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } } } @@ -1041,14 +1047,9 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SC CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - int32_t code = 0, taskNum = 0; + int32_t code = 0; SCtgJob *pJob = NULL; - CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, pReq, fp, param, &taskNum)); - if (taskNum <= 0) { - SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData)); - fp(pMetaData, param, TSDB_CODE_SUCCESS); - CTG_API_LEAVE(TSDB_CODE_SUCCESS); - } + CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, pReq, fp, param)); CTG_ERR_JRET(ctgLaunchJob(pJob)); @@ -1056,6 +1057,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SC // *jobId = pJob->refId; _return: + if (pJob) { taosReleaseRef(gCtgMgmt.jobPool, pJob->refId); @@ -1257,19 +1259,19 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) { } int32_t catalogClearCache(void) { - CTG_API_ENTER(); + CTG_API_ENTER_NOLOCK(); qInfo("start to clear catalog cache"); if (NULL == gCtgMgmt.pCluster || atomic_load_8((int8_t*)&gCtgMgmt.exit)) { - CTG_API_LEAVE(TSDB_CODE_SUCCESS); + CTG_API_LEAVE_NOLOCK(TSDB_CODE_SUCCESS); } - int32_t code = ctgClearCacheEnqueue(NULL, false, true); + int32_t code = ctgClearCacheEnqueue(NULL, false, false, true); qInfo("clear catalog cache end, code: %s", tstrerror(code)); - CTG_API_LEAVE(code); + CTG_API_LEAVE_NOLOCK(code); } @@ -1282,7 +1284,7 @@ void catalogDestroy(void) { atomic_store_8((int8_t*)&gCtgMgmt.exit, true); - ctgClearCacheEnqueue(NULL, true, true); + ctgClearCacheEnqueue(NULL, true, true, true); taosHashCleanup(gCtgMgmt.pCluster); gCtgMgmt.pCluster = NULL; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 18c95397dd..6184d13533 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -427,7 +427,7 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas return TSDB_CODE_SUCCESS; } -int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) { +int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param) { int32_t code = 0; int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta); int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup); @@ -443,11 +443,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex); int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg); - *taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum; - if (*taskNum <= 0) { - ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, pConn->requestId); - return TSDB_CODE_SUCCESS; - } + int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum; *job = taosMemoryCalloc(1, sizeof(SCtgJob)); if (NULL == *job) { @@ -477,15 +473,15 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const pJob->tbCfgNum = tbCfgNum; pJob->svrVerNum = svrVerNum; - pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask)); + pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask)); if (NULL == pJob->pTasks) { - ctgError("taosArrayInit %d tasks failed", *taskNum); + ctgError("taosArrayInit %d tasks failed", taskNum); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - if (pReq->forceUpdate) { - CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, *taskNum, pJob, pReq)); + if (pReq->forceUpdate && taskNum) { + CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, taskNum, pJob, pReq)); } for (int32_t i = 0; i < dbVgNum; ++i) { @@ -558,11 +554,12 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); - qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, *taskNum, pReq->forceUpdate); + qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate); return TSDB_CODE_SUCCESS; _return: + taosMemoryFreeClear(*job); CTG_RET(code); } @@ -763,7 +760,7 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgInvokeSubCb(SCtgTask *pTask) { +int32_t ctgCallSubCb(SCtgTask *pTask) { int32_t code = 0; CTG_LOCK(CTG_WRITE, &pTask->lock); @@ -790,6 +787,15 @@ _return: CTG_RET(code); } +int32_t ctgCallUserCb(void* param) { + SCtgJob* pJob = (SCtgJob*)param; + + (*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode); + + taosRemoveRef(gCtgMgmt.jobPool, pJob->refId); + + return TSDB_CODE_SUCCESS; +} int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { SCtgJob* pJob = pTask->pJob; @@ -804,7 +810,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { pTask->code = rspCode; pTask->status = CTG_TASK_DONE; - ctgInvokeSubCb(pTask); + ctgCallSubCb(pTask); int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1); if (taskDone < taosArrayGetSize(pJob->pTasks)) { @@ -818,9 +824,9 @@ _return: qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code)); - (*pJob->userFp)(&pJob->jobRes, pJob->userParam, code); - - taosRemoveRef(gCtgMgmt.jobPool, pJob->refId); + pJob->jobResCode = code; + + taosAsyncExec(ctgCallUserCb, pJob, NULL); CTG_RET(code); } @@ -1697,6 +1703,12 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { pTask->status = CTG_TASK_LAUNCHED; } + if (taskNum <= 0) { + qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); + + taosAsyncExec(ctgCallUserCb, pJob, NULL); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index fca935e81f..9c9aa4001c 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -214,7 +214,7 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac *pCache = dbCache; - CTG_CACHE_STAT_INC(vgHitNum, 1); + CTG_CACHE_STAT_INC(numOfVgHit, 1); ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName); @@ -228,7 +228,7 @@ _return: *pCache = NULL; - CTG_CACHE_STAT_INC(vgMissNum, 1); + CTG_CACHE_STAT_INC(numOfVgMiss, 1); return TSDB_CODE_SUCCESS; } @@ -260,7 +260,7 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, S ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName); - CTG_CACHE_STAT_INC(tbMetaHitNum, 1); + CTG_CACHE_STAT_INC(numOfMetaHit, 1); return TSDB_CODE_SUCCESS; @@ -268,7 +268,7 @@ _return: ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); - CTG_CACHE_STAT_INC(tbMetaMissNum, 1); + CTG_CACHE_STAT_INC(numOfMetaMiss, 1); return TSDB_CODE_SUCCESS; } @@ -307,7 +307,7 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid, ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName); - CTG_CACHE_STAT_INC(tbMetaHitNum, 1); + CTG_CACHE_STAT_INC(numOfMetaHit, 1); return TSDB_CODE_SUCCESS; @@ -315,7 +315,7 @@ _return: ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); - CTG_CACHE_STAT_INC(tbMetaMissNum, 1); + CTG_CACHE_STAT_INC(numOfMetaMiss, 1); *pDb = NULL; *pTb = NULL; @@ -351,7 +351,7 @@ int32_t ctgAcquireTbIndexFromCache(SCatalog* pCtg, char *dbFName, char* tbName, ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName); - CTG_CACHE_STAT_INC(tbIndexHitNum, 1); + CTG_CACHE_STAT_INC(numOfIndexHit, 1); return TSDB_CODE_SUCCESS; @@ -359,7 +359,7 @@ _return: ctgReleaseTbIndexToCache(pCtg, dbCache, pCache); - CTG_CACHE_STAT_INC(tbIndexMissNum, 1); + CTG_CACHE_STAT_INC(numOfIndexMiss, 1); return TSDB_CODE_SUCCESS; } @@ -455,7 +455,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize); if (NULL == *pTableMeta) { ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); @@ -583,11 +583,6 @@ _return: } int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) { - if (NULL == pCtg->userCache) { - ctgDebug("empty user auth cache, user:%s", user); - goto _return; - } - SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user)); if (NULL == pUser) { ctgDebug("user not in cache, user:%s", user); @@ -597,7 +592,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE *inCache = true; ctgDebug("Got user from cache, user:%s", user); - CTG_CACHE_STAT_INC(userHitNum, 1); + CTG_CACHE_STAT_INC(numOfUserHit, 1); if (pUser->superUser) { *pass = true; @@ -626,7 +621,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE _return: *inCache = false; - CTG_CACHE_STAT_INC(userMissNum, 1); + CTG_CACHE_STAT_INC(numOfUserMiss, 1); return TSDB_CODE_SUCCESS; } @@ -649,7 +644,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode)); if (NULL == node) { qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); - CTG_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_RET(TSDB_CODE_OUT_OF_MEMORY); } if (operation->syncOp) { @@ -670,7 +665,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); CTG_QUEUE_INC(); - CTG_RT_STAT_INC(qNum, 1); + CTG_RT_STAT_INC(numOfOpEnqueue, 1); tsem_post(&gCtgMgmt.queue.reqSem); @@ -693,7 +688,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } char *p = strchr(dbFName, '.'); @@ -726,7 +721,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } char *p = strchr(dbFName, '.'); @@ -760,7 +755,7 @@ int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } msg->pCtg = pCtg; @@ -792,7 +787,7 @@ int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } msg->pCtg = pCtg; @@ -822,7 +817,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); ctgFreeVgInfo(dbInfo); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } char *p = strchr(dbFName, '.'); @@ -857,7 +852,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } char *p = strchr(output->dbFName, '.'); @@ -889,7 +884,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } msg->pCtg = pCtg; @@ -921,7 +916,7 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } msg->pCtg = pCtg; @@ -950,7 +945,7 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncO SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg)); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } msg->pCtg = pCtg; @@ -981,7 +976,7 @@ int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp) { SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } msg->pCtg = pCtg; @@ -1002,7 +997,7 @@ _return: } -int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool stopQueue, bool syncOp) { +int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp) { int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_CLEAR_CACHE; @@ -1012,10 +1007,11 @@ int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool stopQueue, bool syncOp) { SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } msg->pCtg = pCtg; + msg->freeCtg = freeCtg; op->data = msg; CTG_ERR_JRET(ctgEnqueue(pCtg, op)); @@ -1040,7 +1036,7 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { mgmt->slots = taosMemoryCalloc(1, msgSize); if (NULL == mgmt->slots) { qError("calloc %d failed", (int32_t)msgSize); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum); @@ -1060,13 +1056,13 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size); if (NULL == slot->meta) { qError("taosArrayInit %d failed, id:0x%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } } if (NULL == taosArrayPush(slot->meta, meta)) { qError("taosArrayPush meta to rent failed, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } slot->needSort = true; @@ -1184,7 +1180,7 @@ int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_ *res = taosMemoryMalloc(msize); if (NULL == *res) { qError("malloc %d failed", (int32_t)msize); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } void *meta = taosArrayGet(slot->meta, 0); @@ -1234,13 +1230,13 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == newDBCache.tbCache) { ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); if (NULL == newDBCache.stbCache) { ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache)); @@ -1251,10 +1247,10 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { } ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_CACHE_STAT_INC(dbNum, 1); + CTG_CACHE_STAT_INC(numOfDb, 1); SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1}; strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); @@ -1315,7 +1311,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); } - CTG_CACHE_STAT_DEC(dbNum, 1); + CTG_CACHE_STAT_DEC(numOfDb, 1); ctgInfo("db removed from cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId); return TSDB_CODE_SUCCESS; @@ -1412,7 +1408,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) { ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%"PRIx64, dbFName, tbName, orig->suid); } else { - CTG_CACHE_STAT_DEC(stblNum, 1); + CTG_CACHE_STAT_DEC(numOfStb, 1); ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%"PRIx64, dbFName, tbName, orig->suid); } @@ -1426,7 +1422,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) { taosMemoryFree(meta); ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName)); @@ -1436,7 +1432,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam } if (NULL == orig) { - CTG_CACHE_STAT_INC(tblNum, 1); + CTG_CACHE_STAT_INC(numOfTbl, 1); } ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); @@ -1448,10 +1444,10 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam if (origSuid != meta->suid && taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) { ctgError("taosHashPut to stable cache failed, suid:0x%"PRIx64, meta->suid); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_CACHE_STAT_INC(stblNum, 1); + CTG_CACHE_STAT_INC(numOfStb, 1); ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, meta->tableType); @@ -1479,7 +1475,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa ctgFreeSTableIndex(*index); taosMemoryFreeClear(*index); ctgError("taosHashPut new tbCache failed, tbName:%s", tbName); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } *index = NULL; @@ -1527,7 +1523,7 @@ _return: CTG_RET(code); } -void ctgClearAllCtgInstance(void) { +void ctgClearAllInstance(void) { SCatalog* pCtg = NULL; void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); @@ -1535,7 +1531,24 @@ void ctgClearAllCtgInstance(void) { pCtg = *(SCatalog**)pIter; if (pCtg) { - catalogFreeHandle(pCtg); + ctgClearHandle(pCtg); + } + + pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); + } + + taosHashClear(gCtgMgmt.pCluster); +} + +void ctgFreeAllInstance(void) { + SCatalog* pCtg = NULL; + + void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); + while (pIter) { + pCtg = *(SCatalog**)pIter; + + if (pCtg) { + ctgFreeHandle(pCtg); } pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); @@ -1559,7 +1572,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } bool newAdded = false; @@ -1739,13 +1752,13 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) { ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); } else { - CTG_CACHE_STAT_DEC(stblNum, 1); + CTG_CACHE_STAT_DEC(numOfStb, 1); } if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) { ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); } else { - CTG_CACHE_STAT_DEC(tblNum, 1); + CTG_CACHE_STAT_DEC(numOfTbl, 1); } ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); @@ -1781,7 +1794,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } else { - CTG_CACHE_STAT_DEC(tblNum, 1); + CTG_CACHE_STAT_DEC(numOfTbl, 1); } ctgDebug("table %s removed from cache, dbFName:%s", msg->tbName, msg->dbFName); @@ -1797,14 +1810,6 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) { int32_t code = 0; SCtgUpdateUserMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; - - if (NULL == pCtg->userCache) { - pCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (NULL == pCtg->userCache) { - ctgError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - } SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user)); if (NULL == pUser) { @@ -1962,14 +1967,27 @@ int32_t ctgOpClearCache(SCtgCacheOperation *operation) { SCtgClearCacheMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; + CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); + if (pCtg) { - catalogFreeHandle(pCtg); + if (msg->freeCtg) { + ctgFreeHandle(pCtg); + } else { + ctgClearHandle(pCtg); + } + goto _return; } - - ctgClearAllCtgInstance(); + + if (msg->freeCtg) { + ctgFreeAllInstance(); + } else { + ctgClearAllInstance(); + } _return: + + CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock); taosMemoryFreeClear(msg); @@ -1979,16 +1997,29 @@ _return: void ctgCleanupCacheQueue(void) { SCtgQNode *node = NULL; SCtgQNode *nodeNext = NULL; + SCtgCacheOperation *op = NULL; + bool stopQueue = false; while (true) { node = gCtgMgmt.queue.head->next; while (node) { if (node->op) { - taosMemoryFree(node->op->data); - if (node->op->syncOp) { - tsem_post(&node->op->rspSem); + op = node->op; + if (op->stopQueue) { + SCatalog *pCtg = ((SCtgUpdateMsgHeader *)op->data)->pCtg; + ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name); + (*gCtgCacheOperation[op->opId].func)(op); + stopQueue = true; + CTG_RT_STAT_INC(numOfOpDequeue, 1); } else { - taosMemoryFree(node->op); + taosMemoryFree(op->data); + CTG_RT_STAT_INC(numOfOpAbort, 1); + } + + if (op->syncOp) { + tsem_post(&op->rspSem); + } else { + taosMemoryFree(op); } } @@ -1998,7 +2029,7 @@ void ctgCleanupCacheQueue(void) { node = nodeNext; } - if (CTG_IS_LOCKED(&gCtgMgmt.lock)) { + if (!stopQueue) { taosUsleep(1); } else { break; @@ -2036,7 +2067,7 @@ void* ctgUpdateThreadFunc(void* param) { tsem_post(&operation->rspSem); } - CTG_RT_STAT_INC(qDoneNum, 1); + CTG_RT_STAT_INC(numOfOpDequeue, 1); ctgdShowCacheInfo(); ctgdShowClusterCache(pCtg); diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index 5f54362d8e..2cb6f0209f 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -19,7 +19,7 @@ #include "catalogInt.h" extern SCatalogMgmt gCtgMgmt; -SCtgDebug gCTGDebug = {.cacheEnable = true}; +SCtgDebug gCTGDebug = {.lockEnable = true, .apiEnable = true}; void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) { ASSERT(*(int32_t*)param == 1); @@ -255,8 +255,8 @@ int32_t ctgdEnableDebug(char *option) { } int32_t ctgdGetStatNum(char *option, void *res) { - if (0 == strcasecmp(option, "runtime.qDoneNum")) { - *(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.qDoneNum); + if (0 == strcasecmp(option, "runtime.numOfOpDequeue")) { + *(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.numOfOpDequeue); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 41f6db5fc1..21e78d4925 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -145,10 +145,10 @@ void ctgFreeStbMetaCache(SCtgDBCache *dbCache) { return; } - int32_t stblNum = taosHashGetSize(dbCache->stbCache); + int32_t stbNum = taosHashGetSize(dbCache->stbCache); taosHashCleanup(dbCache->stbCache); dbCache->stbCache = NULL; - CTG_CACHE_STAT_DEC(stblNum, stblNum); + CTG_CACHE_STAT_DEC(numOfStb, stbNum); } void ctgFreeTbCacheImpl(SCtgTbCache *pCache) { @@ -172,7 +172,7 @@ void ctgFreeTbCache(SCtgDBCache *dbCache) { } taosHashCleanup(dbCache->tbCache); dbCache->tbCache = NULL; - CTG_CACHE_STAT_DEC(tblNum, tblNum); + CTG_CACHE_STAT_DEC(numOfTbl, tblNum); } void ctgFreeVgInfo(SDBVgInfo *vgInfo) { @@ -202,41 +202,53 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) { ctgFreeTbCache(dbCache); } +void ctgFreeInstDbCache(SHashObj* pDbCache) { + if (NULL == pDbCache) { + return; + } + + int32_t dbNum = taosHashGetSize(pDbCache); + + void *pIter = taosHashIterate(pDbCache, NULL); + while (pIter) { + SCtgDBCache *dbCache = pIter; + atomic_store_8(&dbCache->deleted, 1); + ctgFreeDbCache(dbCache); + + pIter = taosHashIterate(pDbCache, pIter); + } + + taosHashCleanup(pDbCache); + + CTG_CACHE_STAT_DEC(numOfDb, dbNum); +} + +void ctgFreeInstUserCache(SHashObj* pUserCache) { + if (NULL == pUserCache) { + return; + } + + int32_t userNum = taosHashGetSize(pUserCache); + + void *pIter = taosHashIterate(pUserCache, NULL); + while (pIter) { + SCtgUserAuth *userCache = pIter; + ctgFreeSCtgUserAuth(userCache); + + pIter = taosHashIterate(pUserCache, pIter); + } + + taosHashCleanup(pUserCache); + + CTG_CACHE_STAT_DEC(numOfUser, userNum); +} void ctgFreeHandleImpl(SCatalog* pCtg) { ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); - - if (pCtg->dbCache) { - int32_t dbNum = taosHashGetSize(pCtg->dbCache); - - void *pIter = taosHashIterate(pCtg->dbCache, NULL); - while (pIter) { - SCtgDBCache *dbCache = pIter; - atomic_store_8(&dbCache->deleted, 1); - ctgFreeDbCache(dbCache); - - pIter = taosHashIterate(pCtg->dbCache, pIter); - } - taosHashCleanup(pCtg->dbCache); - CTG_CACHE_STAT_DEC(dbNum, dbNum); - } - - if (pCtg->userCache) { - int32_t userNum = taosHashGetSize(pCtg->userCache); - - void *pIter = taosHashIterate(pCtg->userCache, NULL); - while (pIter) { - SCtgUserAuth *userCache = pIter; - ctgFreeSCtgUserAuth(userCache); - - pIter = taosHashIterate(pCtg->userCache, pIter); - } - - taosHashCleanup(pCtg->userCache); - CTG_CACHE_STAT_DEC(userNum, userNum); - } + ctgFreeInstDbCache(pCtg->dbCache); + ctgFreeInstUserCache(pCtg->userCache); taosMemoryFree(pCtg); } @@ -247,21 +259,51 @@ void ctgFreeHandle(SCatalog* pCtg) { return; } - if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) { - ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:0x%" PRIx64, pCtg->clusterId); - return; - } - - CTG_CACHE_STAT_DEC(clusterNum, 1); - uint64_t clusterId = pCtg->clusterId; - ctgFreeHandleImpl(pCtg); + ctgFreeMetaRent(&pCtg->dbRent); + ctgFreeMetaRent(&pCtg->stbRent); + + ctgFreeInstDbCache(pCtg->dbCache); + ctgFreeInstUserCache(pCtg->userCache); + + CTG_CACHE_STAT_DEC(numOfCluster, 1); + + taosMemoryFree(pCtg); ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId); } +void ctgClearHandle(SCatalog* pCtg) { + if (NULL == pCtg) { + return; + } + uint64_t clusterId = pCtg->clusterId; + + ctgFreeMetaRent(&pCtg->dbRent); + ctgFreeMetaRent(&pCtg->stbRent); + + ctgFreeInstDbCache(pCtg->dbCache); + ctgFreeInstUserCache(pCtg->userCache); + + ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB); + ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE); + + pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pCtg->dbCache) { + qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER); + } + + pCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pCtg->userCache) { + ctgError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum); + } + + CTG_CACHE_STAT_INC(numOfClear, 1); + + ctgInfo("handle cleared, culsterId:0x%" PRIx64, clusterId); +} void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) { if (NULL == pOutput) { @@ -615,7 +657,7 @@ int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) { vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo)); if (NULL == vgList) { ctgError("taosArrayInit failed, num:%d", vgNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } void *pIter = taosHashIterate(vgHash, NULL); @@ -625,7 +667,7 @@ int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) { if (NULL == taosArrayPush(vgList, vgInfo)) { ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId); taosHashCancelIterate(vgHash, pIter); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } pIter = taosHashIterate(vgHash, pIter); @@ -742,7 +784,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) { *dst = taosMemoryMalloc(sizeof(SDBVgInfo)); if (NULL == *dst) { qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } memcpy(*dst, src, sizeof(SDBVgInfo)); @@ -752,7 +794,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) { if (NULL == (*dst)->vgHash) { qError("taosHashInit %d failed", (int32_t)hashSize); taosMemoryFreeClear(*dst); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } int32_t *vgId = NULL; @@ -765,7 +807,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) { taosHashCancelIterate(src->vgHash, pIter); taosHashCleanup((*dst)->vgHash); taosMemoryFreeClear(*dst); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } pIter = taosHashIterate(src->vgHash, pIter); @@ -781,7 +823,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) *pOutput = taosMemoryMalloc(sizeof(STableMetaOutput)); if (NULL == *pOutput) { qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } memcpy(*pOutput, output, sizeof(STableMetaOutput)); @@ -792,7 +834,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) if (NULL == (*pOutput)->tbMeta) { qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); taosMemoryFreeClear(*pOutput); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index aab341544f..3245fcd16a 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -1386,7 +1386,7 @@ TEST(tableMeta, updateStbMeta) { while (true) { uint64_t n = 0; - ctgdGetStatNum("runtime.qDoneNum", (void *)&n); + ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n); if (n != 3) { taosMsleep(50); } else { @@ -1456,7 +1456,7 @@ TEST(refreshGetMeta, normal2normal) { while (true) { uint64_t n = 0; - ctgdGetStatNum("runtime.qDoneNum", (void *)&n); + ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n); if (n > 0) { break; } @@ -1535,7 +1535,7 @@ TEST(refreshGetMeta, normal2notexist) { while (true) { uint64_t n = 0; - ctgdGetStatNum("runtime.qDoneNum", (void *)&n); + ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n); if (n > 0) { break; } @@ -1609,7 +1609,7 @@ TEST(refreshGetMeta, normal2child) { while (true) { uint64_t n = 0; - ctgdGetStatNum("runtime.qDoneNum", (void *)&n); + ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n); if (n > 0) { break; } @@ -1693,7 +1693,7 @@ TEST(refreshGetMeta, stable2child) { while (true) { uint64_t n = 0; - ctgdGetStatNum("runtime.qDoneNum", (void *)&n); + ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n); if (n > 0) { break; } @@ -1778,7 +1778,7 @@ TEST(refreshGetMeta, stable2stable) { while (true) { uint64_t n = 0; - ctgdGetStatNum("runtime.qDoneNum", (void *)&n); + ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n); if (n > 0) { break; } @@ -1866,7 +1866,7 @@ TEST(refreshGetMeta, child2stable) { while (true) { uint64_t n = 0; - ctgdGetStatNum("runtime.qDoneNum", (void *)&n); + ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n); if (n > 0) { break; } @@ -2083,7 +2083,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { while (true) { uint64_t n = 0; - ctgdGetStatNum("runtime.qDoneNum", (void *)&n); + ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n); if (n > 0) { break; } @@ -2109,7 +2109,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { while (true) { uint64_t n = 0; - ctgdGetStatNum("runtime.qDoneNum", (void *)&n); + ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n); if (n != 3) { taosMsleep(50); } else { diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index c807c1c4cd..cc02797e8f 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -416,7 +416,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { taosHashCancelIterate(pSrc->vgHash, pIter); taosHashCleanup((*pDst)->vgHash); taosMemoryFreeClear(*pDst); - return TSDB_CODE_CTG_MEM_ERROR; + return TSDB_CODE_OUT_OF_MEMORY; } pIter = taosHashIterate(pSrc->vgHash, pIter); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d81089e281..c941382222 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -453,7 +453,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog internal error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready") -TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") +TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "catalog memory error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service")