From 9c9fd839dc120195ca5ebb9ebbd6862d3c29cbf2 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Tue, 2 Feb 2021 15:29:31 +0800 Subject: [PATCH] fix bug --- src/cq/src/cqMain.c | 51 +++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index c2df0d36b2..0dc3300911 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -91,6 +91,20 @@ void cqRmFromList(SCqObj *pObj) { } +static void freeSCqContext(void *handle) { + if (handle == NULL) { + return; + } + SCqContext *pContext = handle; + pthread_mutex_destroy(&pContext->mutex); + + taosTmrCleanUp(pContext->tmrCtrl); + pContext->tmrCtrl = NULL; + cDebug("vgId:%d, CQ is closed", pContext->vgId); + free(pContext); +} + + void cqFree(void *handle) { if (tsEnableStream == 0) { return; @@ -125,13 +139,7 @@ void cqFree(void *handle) { pthread_mutex_unlock(&pContext->mutex); if (delete) { - pthread_mutex_destroy(&pContext->mutex); - - taosTmrCleanUp(pContext->tmrCtrl); - pContext->tmrCtrl = NULL; - - cDebug("vgId:%d, CQ is closed", pContext->vgId); - free(pContext); + freeSCqContext(pContext); } } @@ -184,18 +192,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { return pContext; } -static void freeSCqContext(void *handle) { - if (handle == NULL) { - return; - } - SCqContext *pContext = handle; - pthread_mutex_destroy(&pContext->mutex); - - taosTmrCleanUp(pContext->tmrCtrl); - pContext->tmrCtrl = NULL; - cDebug("vgId:%d, CQ is closed", pContext->vgId); - free(pContext); -} + void cqClose(void *handle) { if (tsEnableStream == 0) { return; @@ -204,6 +201,8 @@ void cqClose(void *handle) { if (handle == NULL) return; pContext->delete = 1; + int32_t hasCq = 0; + int32_t existLoop = 0; // stop all CQs cqStop(pContext); @@ -218,6 +217,12 @@ void cqClose(void *handle) { cqRmFromList(pObj); rid = pObj->rid; + + hasCq = 1; + + if (pContext->pHead == NULL) { + existLoop = 1; + } } else { pthread_mutex_unlock(&pContext->mutex); break; @@ -226,9 +231,15 @@ void cqClose(void *handle) { pthread_mutex_unlock(&pContext->mutex); taosRemoveRef(cqObjRef, rid); + + if (existLoop) { + break; + } } - freeSCqContext(pContext); + if (hasCq == 0) { + freeSCqContext(pContext); + } } void cqStart(void *handle) {