diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 637cb97d3a..462678f79a 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -542,6 +542,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { if (operation->syncOp) { tsem_wait(&operation->rspSem); + taosMemoryFree(operation); } return TSDB_CODE_SUCCESS; @@ -1612,6 +1613,30 @@ void ctgUpdateThreadUnexpectedStopped(void) { if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); } +void ctgCleanupCacheQueue(void) { + SCtgQNode *node = gCtgMgmt.queue.head->next; + SCtgQNode *nodeNext = NULL; + + while (node) { + if (node->op) { + taosMemoryFree(node->op->data); + if (node->op->syncOp) { + tsem_post(&node->op->rspSem); + } else { + taosMemoryFree(node->op); + } + } + + nodeNext = node->next; + taosMemoryFree(node); + + node = nodeNext; + } + + taosMemoryFreeClear(gCtgMgmt.queue.head); + gCtgMgmt.queue.tail = NULL; +} + void* ctgUpdateThreadFunc(void* param) { setThreadName("catalog"); #ifdef WINDOWS @@ -1627,6 +1652,7 @@ void* ctgUpdateThreadFunc(void* param) { } if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { + ctgCleanupCacheQueue(); break; }