clean cache queue
This commit is contained in:
parent
73d4e08ee6
commit
dbe1d1462d
|
@ -542,6 +542,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
|
||||||
|
|
||||||
if (operation->syncOp) {
|
if (operation->syncOp) {
|
||||||
tsem_wait(&operation->rspSem);
|
tsem_wait(&operation->rspSem);
|
||||||
|
taosMemoryFree(operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1612,6 +1613,30 @@ void ctgUpdateThreadUnexpectedStopped(void) {
|
||||||
if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
|
if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ctgCleanupCacheQueue(void) {
|
||||||
|
SCtgQNode *node = gCtgMgmt.queue.head->next;
|
||||||
|
SCtgQNode *nodeNext = NULL;
|
||||||
|
|
||||||
|
while (node) {
|
||||||
|
if (node->op) {
|
||||||
|
taosMemoryFree(node->op->data);
|
||||||
|
if (node->op->syncOp) {
|
||||||
|
tsem_post(&node->op->rspSem);
|
||||||
|
} else {
|
||||||
|
taosMemoryFree(node->op);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeNext = node->next;
|
||||||
|
taosMemoryFree(node);
|
||||||
|
|
||||||
|
node = nodeNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(gCtgMgmt.queue.head);
|
||||||
|
gCtgMgmt.queue.tail = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
void* ctgUpdateThreadFunc(void* param) {
|
void* ctgUpdateThreadFunc(void* param) {
|
||||||
setThreadName("catalog");
|
setThreadName("catalog");
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
|
@ -1627,6 +1652,7 @@ void* ctgUpdateThreadFunc(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
|
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
|
||||||
|
ctgCleanupCacheQueue();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue