diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 5203d66d16..f90a1d5f5e 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -50,6 +50,8 @@ typedef struct { void *dbConn; void *tmrCtrl; pthread_mutex_t mutex; + int32_t delete; + int32_t cqObjNum; } SCqContext; typedef struct SCqObj { @@ -72,17 +74,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); int32_t cqObjRef = -1; +void cqRmFromList(SCqObj *pObj) { + //LOCK in caller - -void cqFree(void *handle) { - if (tsEnableStream == 0) { - return; - } - SCqObj *pObj = handle; SCqContext *pContext = pObj->pContext; - int32_t last = 0; - - pthread_mutex_lock(&pContext->mutex); if (pObj->prev) { pObj->prev->next = pObj->next; @@ -94,10 +89,18 @@ void cqFree(void *handle) { pObj->next->prev = pObj->prev; } - if (pContext->pHead == NULL) { - last = 1; - } +} +void cqFree(void *handle) { + if (tsEnableStream == 0) { + return; + } + SCqObj *pObj = handle; + SCqContext *pContext = pObj->pContext; + int32_t delete = 0; + + pthread_mutex_lock(&pContext->mutex); + // free the resources associated if (pObj->pStream) { taos_close_stream(pObj->pStream); @@ -113,9 +116,15 @@ void cqFree(void *handle) { free(pObj->sqlStr); free(pObj); + pContext->cqObjNum--; + + if (pContext->cqObjNum <= 0 && pContext->delete) { + delete = 1; + } + pthread_mutex_unlock(&pContext->mutex); - if (last) { + if (delete) { pthread_mutex_unlock(&pContext->mutex); pthread_mutex_destroy(&pContext->mutex); @@ -184,19 +193,30 @@ void cqClose(void *handle) { SCqContext *pContext = handle; if (handle == NULL) return; + pContext->delete = 1; + // stop all CQs cqStop(pContext); - // free all resources - pthread_mutex_lock(&pContext->mutex); + int64_t rid = 0; - SCqObj *pObj = pContext->pHead; - while (pObj) { - SCqObj *pTemp = pObj; - pObj = pObj->next; + while (1) { + pthread_mutex_lock(&pContext->mutex); - taosReleaseRef(cqObjRef, pTemp->rid); - } + SCqObj *pObj = pContext->pHead; + if (pObj) { + cqRmFromList(pObj); + + rid = pObj->rid; + } else { + pthread_mutex_unlock(&pContext->mutex); + break; + } + + pthread_mutex_unlock(&pContext->mutex); + + taosReleaseRef(cqObjRef, rid); + } } void cqStart(void *handle) { @@ -255,7 +275,8 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch return NULL; } SCqContext *pContext = handle; - + int64_t rid = 0; + SCqObj *pObj = calloc(sizeof(SCqObj), 1); if (pObj == NULL) return NULL; @@ -277,25 +298,36 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch if (pContext->pHead) pContext->pHead->prev = pObj; pContext->pHead = pObj; + pContext->cqObjNum++; + pObj->rid = taosAddRef(cqObjRef, pObj); cqCreateStream(pContext, pObj); + rid = pObj->rid; + pthread_mutex_unlock(&pContext->mutex); - return pObj; + return (void *)rid; } void cqDrop(void *handle) { if (tsEnableStream == 0) { return; } - SCqObj *pObj = handle; - SCqContext *pContext = pObj->pContext; + SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)handle); + if (pObj == NULL) { + return; + } + + SCqContext *pContext = pObj->pContext; + pthread_mutex_lock(&pContext->mutex); + cqRmFromList(pObj); + // free the resources associated if (pObj->pStream) { taos_close_stream(pObj->pStream); @@ -307,7 +339,8 @@ void cqDrop(void *handle) { pthread_mutex_unlock(&pContext->mutex); - taosReleaseRef(cqObjRef, pObj->rid); + taosRemoveRef(cqObjRef, (int64_t)handle); + taosReleaseRef(cqObjRef, (int64_t)handle); } static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {