diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 20f4765190..4352946c7d 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -145,14 +145,6 @@ int32_t catalogInit(SCatalogCfg* cfg); */ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); -/** - * Free a cluster's all catalog info, usually it's not necessary, until the application is closing. - * no current or future usage should be guaranteed by application - * @param pCatalog (input, NO more usage) - * @return error code - */ -void catalogFreeHandle(SCatalog* pCatalog); - int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum); /** diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 8fb89472cf..746b2f000f 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -383,7 +383,7 @@ typedef struct SCtgCacheOperation { void *data; bool syncOp; tsem_t rspSem; - bool lockQ; + bool stopQueue; } SCtgCacheOperation; typedef struct SCtgQNode { @@ -393,7 +393,7 @@ typedef struct SCtgQNode { typedef struct SCtgQueue { SRWLatch qlock; - bool lockQ; + bool stopQueue; SCtgQNode *head; SCtgQNode *tail; tsem_t reqSem; @@ -559,7 +559,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq); int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet); int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp); -int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool lockQ, bool syncOp); +int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool stopQueue, bool syncOp); int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type); int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size); int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size); @@ -598,12 +598,13 @@ int32_t ctgLaunchJob(SCtgJob *pJob); int32_t ctgMakeAsyncRes(SCtgJob *pJob); int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param); int32_t ctgGetTbCfgCb(SCtgTask *pTask); +void ctgFreeHandle(SCatalog* pCatalog); int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst); int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput); int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList); void ctgFreeJob(void* job); -void ctgFreeHandle(SCatalog* pCtg); +void ctgFreeHandleImpl(SCatalog* pCtg); void ctgFreeVgInfo(SDBVgInfo *vgInfo); int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup); void ctgResetTbMetaTask(SCtgTask* pTask); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index a887b9eb09..48bc89328d 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -548,9 +548,11 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + if (NULL == gCtgMgmt.pCluster) { qError("catalog cluster cache are not ready, clusterId:0x%" PRIx64, clusterId); - CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY); + CTG_API_LEAVE(TSDB_CODE_CTG_NOT_READY); } int32_t code = 0; @@ -562,13 +564,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { if (ctg && (*ctg)) { *catalogHandle = *ctg; qDebug("got catalog handle from cache, clusterId:0x%" PRIx64 ", CTG:%p", clusterId, *ctg); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } clusterCtg = taosMemoryCalloc(1, sizeof(SCatalog)); if (NULL == clusterCtg) { qError("calloc %d failed", (int32_t)sizeof(SCatalog)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR); } clusterCtg->clusterId = clusterId; @@ -586,7 +588,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES); if (code) { if (HASH_NODE_EXIST(code)) { - ctgFreeHandle(clusterCtg); + ctgFreeHandleImpl(clusterCtg); continue; } @@ -603,32 +605,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { CTG_CACHE_STAT_INC(clusterNum, 1); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); _return: - ctgFreeHandle(clusterCtg); + ctgFreeHandleImpl(clusterCtg); - CTG_RET(code); -} - -void catalogFreeHandle(SCatalog* pCtg) { - if (NULL == pCtg) { - return; - } - - if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) { - ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:0x%" PRIx64, pCtg->clusterId); - return; - } - - CTG_CACHE_STAT_DEC(clusterNum, 1); - - uint64_t clusterId = pCtg->clusterId; - - ctgFreeHandle(pCtg); - - ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId); + CTG_API_LEAVE(code); } int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum) { @@ -1285,10 +1268,6 @@ void catalogDestroy(void) { ctgClearCacheEnqueue(NULL, true, true); - if (tsem_post(&gCtgMgmt.queue.reqSem)) { - qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); - } - taosHashCleanup(gCtgMgmt.pCluster); gCtgMgmt.pCluster = NULL; diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 9af227a9fd..fca935e81f 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -659,12 +659,12 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { node->op = operation; CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); - if (gCtgMgmt.queue.lockQ) { + if (gCtgMgmt.queue.stopQueue) { ctgFreeQNode(node); CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); CTG_RET(TSDB_CODE_CTG_EXIT); } - gCtgMgmt.queue.lockQ = operation->lockQ; + gCtgMgmt.queue.stopQueue = operation->stopQueue; gCtgMgmt.queue.tail->next = node; gCtgMgmt.queue.tail = node; CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); @@ -1002,12 +1002,12 @@ _return: } -int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool lockQ, bool syncOp) { +int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool stopQueue, bool syncOp) { int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_CLEAR_CACHE; op->syncOp = syncOp; - op->lockQ = lockQ; + op->stopQueue = stopQueue; SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg)); if (NULL == msg) { diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 9d2895cdb8..d8292f00d2 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -196,7 +196,7 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) { } -void ctgFreeHandle(SCatalog* pCtg) { +void ctgFreeHandleImpl(SCatalog* pCtg) { ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); @@ -235,6 +235,27 @@ void ctgFreeHandle(SCatalog* pCtg) { } +void ctgFreeHandle(SCatalog* pCtg) { + if (NULL == pCtg) { + return; + } + + if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) { + ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:0x%" PRIx64, pCtg->clusterId); + return; + } + + CTG_CACHE_STAT_DEC(clusterNum, 1); + + uint64_t clusterId = pCtg->clusterId; + + ctgFreeHandleImpl(pCtg); + + ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId); +} + + + void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) { if (NULL == pOutput) { return;