From b1e781b1673edf7d61570c5e0bc4a7e4a590937b Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Wed, 27 Jan 2021 10:58:26 +0800 Subject: [PATCH 1/5] fix bug --- src/cq/src/cqMain.c | 127 +++++++++++++++++++++++++++++++++----------- 1 file changed, 96 insertions(+), 31 deletions(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 5f1fecc494..083028a32c 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -54,6 +54,7 @@ typedef struct { typedef struct SCqObj { tmr_h tmrId; + int64_t rid; uint64_t uid; int32_t tid; // table ID int32_t rowSize; // bytes of a row @@ -69,6 +70,77 @@ typedef struct SCqObj { static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); +int32_t cqObjRef = -1; + + + +void cqFree(void *handle) { + if (tsEnableStream == 0) { + return; + } + SCqObj *pObj = handle; + SCqContext *pContext = pObj->pContext; + int32_t last = 0; + + pthread_mutex_lock(&pContext->mutex); + + if (pObj->prev) { + pObj->prev->next = pObj->next; + } else { + pContext->pHead = pObj->next; + } + + if (pObj->next) { + pObj->next->prev = pObj->prev; + } + + if (pContext->pHead == NULL) { + last = 1; + } + + // free the resources associated + if (pObj->pStream) { + taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + } else { + taosTmrStop(pObj->tmrId); + pObj->tmrId = 0; + } + + cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); + tdFreeSchema(pObj->pSchema); + free(pObj->dstTable); + free(pObj->sqlStr); + free(pObj); + + pthread_mutex_unlock(&pContext->mutex); + + if (last) { + pthread_mutex_unlock(&pContext->mutex); + + pthread_mutex_destroy(&pContext->mutex); + + taosTmrCleanUp(pContext->tmrCtrl); + pContext->tmrCtrl = NULL; + + cDebug("vgId:%d, CQ is closed", pContext->vgId); + free(pContext); + } +} + + +void cqCreateRef() { + int32_t ref = atomic_load_32(&cqObjRef); + if (ref == -1) { + ref = taosOpenRef(4096, cqFree); + + if (atomic_val_compare_exchange_32(&cqObjRef, -1, ref) != -1) { + taosCloseRef(ref); + } + } +} + + void *cqOpen(void *ahandle, const SCqCfg *pCfg) { if (tsEnableStream == 0) { return NULL; @@ -79,6 +151,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { return NULL; } + cqCreateRef(); + pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ"); tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user)); @@ -97,6 +171,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { pthread_mutex_init(&pContext->mutex, NULL); + cDebug("vgId:%d, CQ is opened", pContext->vgId); return pContext; @@ -119,20 +194,9 @@ void cqClose(void *handle) { while (pObj) { SCqObj *pTemp = pObj; pObj = pObj->next; - tdFreeSchema(pTemp->pSchema); - tfree(pTemp->sqlStr); - free(pTemp); + + taosReleaseRef(cqObjRef, pTemp->rid); } - - pthread_mutex_unlock(&pContext->mutex); - - pthread_mutex_destroy(&pContext->mutex); - - taosTmrCleanUp(pContext->tmrCtrl); - pContext->tmrCtrl = NULL; - - cDebug("vgId:%d, CQ is closed", pContext->vgId); - free(pContext); } void cqStart(void *handle) { @@ -213,10 +277,13 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch if (pContext->pHead) pContext->pHead->prev = pObj; pContext->pHead = pObj; + pObj->rid = taosAddRef(cqObjRef, pObj); + cqCreateStream(pContext, pObj); pthread_mutex_unlock(&pContext->mutex); + return pObj; } @@ -229,16 +296,6 @@ void cqDrop(void *handle) { pthread_mutex_lock(&pContext->mutex); - if (pObj->prev) { - pObj->prev->next = pObj->next; - } else { - pContext->pHead = pObj->next; - } - - if (pObj->next) { - pObj->next->prev = pObj->prev; - } - // free the resources associated if (pObj->pStream) { taos_close_stream(pObj->pStream); @@ -248,17 +305,17 @@ void cqDrop(void *handle) { pObj->tmrId = 0; } - cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); - tdFreeSchema(pObj->pSchema); - free(pObj->dstTable); - free(pObj->sqlStr); - free(pObj); + taosReleaseRef(cqObjRef, pObj->rid); pthread_mutex_unlock(&pContext->mutex); } static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { - SCqObj* pObj = (SCqObj*)param; + SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param); + if (pObj == NULL) { + return; + } + SCqContext* pContext = pObj->pContext; SSqlObj* pSql = (SSqlObj*)result; if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) { @@ -267,10 +324,16 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { pthread_mutex_lock(&pContext->mutex); cqCreateStream(pContext, pObj); pthread_mutex_unlock(&pContext->mutex); + + taosReleaseRef(cqObjRef, (int64_t)param); } static void cqProcessCreateTimer(void *param, void *tmrId) { - SCqObj* pObj = (SCqObj*)param; + SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param); + if (pObj == NULL) { + return; + } + SCqContext* pContext = pObj->pContext; if (pContext->dbConn == NULL) { @@ -281,6 +344,8 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { cqCreateStream(pContext, pObj); pthread_mutex_unlock(&pContext->mutex); } + + taosReleaseRef(cqObjRef, (int64_t)param); } static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { @@ -288,7 +353,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { if (pContext->dbConn == NULL) { cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId); - pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl); + pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl); return; } pObj->tmrId = 0; From 2327cbb9b8dd4f03b7081cec4fdcb517d6f99dc4 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Wed, 27 Jan 2021 13:48:59 +0800 Subject: [PATCH 2/5] fix bug --- src/cq/src/cqMain.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 083028a32c..5203d66d16 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -305,9 +305,9 @@ void cqDrop(void *handle) { pObj->tmrId = 0; } - taosReleaseRef(cqObjRef, pObj->rid); - pthread_mutex_unlock(&pContext->mutex); + + taosReleaseRef(cqObjRef, pObj->rid); } static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { From c7c0fd1dd5736da22cdacfafde1718cf5b4095b3 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Wed, 27 Jan 2021 15:56:10 +0800 Subject: [PATCH 3/5] fix bug --- src/cq/src/cqMain.c | 85 +++++++++++++++++++++++++++++++-------------- 1 file changed, 59 insertions(+), 26 deletions(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 5203d66d16..f90a1d5f5e 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -50,6 +50,8 @@ typedef struct { void *dbConn; void *tmrCtrl; pthread_mutex_t mutex; + int32_t delete; + int32_t cqObjNum; } SCqContext; typedef struct SCqObj { @@ -72,17 +74,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); int32_t cqObjRef = -1; +void cqRmFromList(SCqObj *pObj) { + //LOCK in caller - -void cqFree(void *handle) { - if (tsEnableStream == 0) { - return; - } - SCqObj *pObj = handle; SCqContext *pContext = pObj->pContext; - int32_t last = 0; - - pthread_mutex_lock(&pContext->mutex); if (pObj->prev) { pObj->prev->next = pObj->next; @@ -94,10 +89,18 @@ void cqFree(void *handle) { pObj->next->prev = pObj->prev; } - if (pContext->pHead == NULL) { - last = 1; - } +} +void cqFree(void *handle) { + if (tsEnableStream == 0) { + return; + } + SCqObj *pObj = handle; + SCqContext *pContext = pObj->pContext; + int32_t delete = 0; + + pthread_mutex_lock(&pContext->mutex); + // free the resources associated if (pObj->pStream) { taos_close_stream(pObj->pStream); @@ -113,9 +116,15 @@ void cqFree(void *handle) { free(pObj->sqlStr); free(pObj); + pContext->cqObjNum--; + + if (pContext->cqObjNum <= 0 && pContext->delete) { + delete = 1; + } + pthread_mutex_unlock(&pContext->mutex); - if (last) { + if (delete) { pthread_mutex_unlock(&pContext->mutex); pthread_mutex_destroy(&pContext->mutex); @@ -184,19 +193,30 @@ void cqClose(void *handle) { SCqContext *pContext = handle; if (handle == NULL) return; + pContext->delete = 1; + // stop all CQs cqStop(pContext); - // free all resources - pthread_mutex_lock(&pContext->mutex); + int64_t rid = 0; - SCqObj *pObj = pContext->pHead; - while (pObj) { - SCqObj *pTemp = pObj; - pObj = pObj->next; + while (1) { + pthread_mutex_lock(&pContext->mutex); - taosReleaseRef(cqObjRef, pTemp->rid); - } + SCqObj *pObj = pContext->pHead; + if (pObj) { + cqRmFromList(pObj); + + rid = pObj->rid; + } else { + pthread_mutex_unlock(&pContext->mutex); + break; + } + + pthread_mutex_unlock(&pContext->mutex); + + taosReleaseRef(cqObjRef, rid); + } } void cqStart(void *handle) { @@ -255,7 +275,8 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch return NULL; } SCqContext *pContext = handle; - + int64_t rid = 0; + SCqObj *pObj = calloc(sizeof(SCqObj), 1); if (pObj == NULL) return NULL; @@ -277,25 +298,36 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch if (pContext->pHead) pContext->pHead->prev = pObj; pContext->pHead = pObj; + pContext->cqObjNum++; + pObj->rid = taosAddRef(cqObjRef, pObj); cqCreateStream(pContext, pObj); + rid = pObj->rid; + pthread_mutex_unlock(&pContext->mutex); - return pObj; + return (void *)rid; } void cqDrop(void *handle) { if (tsEnableStream == 0) { return; } - SCqObj *pObj = handle; - SCqContext *pContext = pObj->pContext; + SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)handle); + if (pObj == NULL) { + return; + } + + SCqContext *pContext = pObj->pContext; + pthread_mutex_lock(&pContext->mutex); + cqRmFromList(pObj); + // free the resources associated if (pObj->pStream) { taos_close_stream(pObj->pStream); @@ -307,7 +339,8 @@ void cqDrop(void *handle) { pthread_mutex_unlock(&pContext->mutex); - taosReleaseRef(cqObjRef, pObj->rid); + taosRemoveRef(cqObjRef, (int64_t)handle); + taosReleaseRef(cqObjRef, (int64_t)handle); } static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { From 7d1b99f92a02d323e7430fac054f2d769da04d1f Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Wed, 27 Jan 2021 17:37:08 +0800 Subject: [PATCH 4/5] fix bug --- src/cq/src/cqMain.c | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index f90a1d5f5e..908042ba06 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -392,7 +392,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->tmrId = 0; if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, (void *)pObj->rid, NULL); // TODO the pObj->pStream may be released if error happens if (pObj->pStream) { @@ -406,18 +406,28 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { } static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { - SCqObj *pObj = (SCqObj *)param; + SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param); + if (pObj == NULL) { + return; + } + if (tres == NULL && row == NULL) { taos_close_stream(pObj->pStream); pObj->pStream = NULL; + + taosReleaseRef(cqObjRef, (int64_t)param); + return; } SCqContext *pContext = pObj->pContext; STSchema *pSchema = pObj->pSchema; - if (pObj->pStream == NULL) return; - + if (pObj->pStream == NULL) { + taosReleaseRef(cqObjRef, (int64_t)param); + return; + } + cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize; @@ -468,5 +478,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { // write into vnode write queue pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL); free(buffer); + + taosReleaseRef(cqObjRef, (int64_t)param); } From 6b87bab62490cc249556fa368ddf35cff498831a Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Wed, 27 Jan 2021 17:53:22 +0800 Subject: [PATCH 5/5] fix bug --- src/cq/src/cqMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 908042ba06..a5de27d7fc 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -215,7 +215,7 @@ void cqClose(void *handle) { pthread_mutex_unlock(&pContext->mutex); - taosReleaseRef(cqObjRef, rid); + taosRemoveRef(cqObjRef, rid); } }