diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 5f1fecc494..a5de27d7fc 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -50,10 +50,13 @@ typedef struct { void *dbConn; void *tmrCtrl; pthread_mutex_t mutex; + int32_t delete; + int32_t cqObjNum; } SCqContext; typedef struct SCqObj { tmr_h tmrId; + int64_t rid; uint64_t uid; int32_t tid; // table ID int32_t rowSize; // bytes of a row @@ -69,6 +72,84 @@ typedef struct SCqObj { static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); +int32_t cqObjRef = -1; + +void cqRmFromList(SCqObj *pObj) { + //LOCK in caller + + SCqContext *pContext = pObj->pContext; + + if (pObj->prev) { + pObj->prev->next = pObj->next; + } else { + pContext->pHead = pObj->next; + } + + if (pObj->next) { + pObj->next->prev = pObj->prev; + } + +} + +void cqFree(void *handle) { + if (tsEnableStream == 0) { + return; + } + SCqObj *pObj = handle; + SCqContext *pContext = pObj->pContext; + int32_t delete = 0; + + pthread_mutex_lock(&pContext->mutex); + + // free the resources associated + if (pObj->pStream) { + taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + } else { + taosTmrStop(pObj->tmrId); + pObj->tmrId = 0; + } + + cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); + tdFreeSchema(pObj->pSchema); + free(pObj->dstTable); + free(pObj->sqlStr); + free(pObj); + + pContext->cqObjNum--; + + if (pContext->cqObjNum <= 0 && pContext->delete) { + delete = 1; + } + + pthread_mutex_unlock(&pContext->mutex); + + if (delete) { + pthread_mutex_unlock(&pContext->mutex); + + pthread_mutex_destroy(&pContext->mutex); + + taosTmrCleanUp(pContext->tmrCtrl); + pContext->tmrCtrl = NULL; + + cDebug("vgId:%d, CQ is closed", pContext->vgId); + free(pContext); + } +} + + +void cqCreateRef() { + int32_t ref = atomic_load_32(&cqObjRef); + if (ref == -1) { + ref = taosOpenRef(4096, cqFree); + + if (atomic_val_compare_exchange_32(&cqObjRef, -1, ref) != -1) { + taosCloseRef(ref); + } + } +} + + void *cqOpen(void *ahandle, const SCqCfg *pCfg) { if (tsEnableStream == 0) { return NULL; @@ -79,6 +160,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { return NULL; } + cqCreateRef(); + pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ"); tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user)); @@ -97,6 +180,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { pthread_mutex_init(&pContext->mutex, NULL); + cDebug("vgId:%d, CQ is opened", pContext->vgId); return pContext; @@ -109,30 +193,30 @@ void cqClose(void *handle) { SCqContext *pContext = handle; if (handle == NULL) return; + pContext->delete = 1; + // stop all CQs cqStop(pContext); - // free all resources - pthread_mutex_lock(&pContext->mutex); + int64_t rid = 0; - SCqObj *pObj = pContext->pHead; - while (pObj) { - SCqObj *pTemp = pObj; - pObj = pObj->next; - tdFreeSchema(pTemp->pSchema); - tfree(pTemp->sqlStr); - free(pTemp); - } - - pthread_mutex_unlock(&pContext->mutex); + while (1) { + pthread_mutex_lock(&pContext->mutex); - pthread_mutex_destroy(&pContext->mutex); + SCqObj *pObj = pContext->pHead; + if (pObj) { + cqRmFromList(pObj); - taosTmrCleanUp(pContext->tmrCtrl); - pContext->tmrCtrl = NULL; - - cDebug("vgId:%d, CQ is closed", pContext->vgId); - free(pContext); + rid = pObj->rid; + } else { + pthread_mutex_unlock(&pContext->mutex); + break; + } + + pthread_mutex_unlock(&pContext->mutex); + + taosRemoveRef(cqObjRef, rid); + } } void cqStart(void *handle) { @@ -191,7 +275,8 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch return NULL; } SCqContext *pContext = handle; - + int64_t rid = 0; + SCqObj *pObj = calloc(sizeof(SCqObj), 1); if (pObj == NULL) return NULL; @@ -213,32 +298,36 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch if (pContext->pHead) pContext->pHead->prev = pObj; pContext->pHead = pObj; + pContext->cqObjNum++; + + pObj->rid = taosAddRef(cqObjRef, pObj); + cqCreateStream(pContext, pObj); + rid = pObj->rid; + pthread_mutex_unlock(&pContext->mutex); - return pObj; + + return (void *)rid; } void cqDrop(void *handle) { if (tsEnableStream == 0) { return; } - SCqObj *pObj = handle; - SCqContext *pContext = pObj->pContext; + SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)handle); + if (pObj == NULL) { + return; + } + + SCqContext *pContext = pObj->pContext; + pthread_mutex_lock(&pContext->mutex); - if (pObj->prev) { - pObj->prev->next = pObj->next; - } else { - pContext->pHead = pObj->next; - } - - if (pObj->next) { - pObj->next->prev = pObj->prev; - } - + cqRmFromList(pObj); + // free the resources associated if (pObj->pStream) { taos_close_stream(pObj->pStream); @@ -248,17 +337,18 @@ void cqDrop(void *handle) { pObj->tmrId = 0; } - cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); - tdFreeSchema(pObj->pSchema); - free(pObj->dstTable); - free(pObj->sqlStr); - free(pObj); - pthread_mutex_unlock(&pContext->mutex); + + taosRemoveRef(cqObjRef, (int64_t)handle); + taosReleaseRef(cqObjRef, (int64_t)handle); } static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { - SCqObj* pObj = (SCqObj*)param; + SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param); + if (pObj == NULL) { + return; + } + SCqContext* pContext = pObj->pContext; SSqlObj* pSql = (SSqlObj*)result; if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) { @@ -267,10 +357,16 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { pthread_mutex_lock(&pContext->mutex); cqCreateStream(pContext, pObj); pthread_mutex_unlock(&pContext->mutex); + + taosReleaseRef(cqObjRef, (int64_t)param); } static void cqProcessCreateTimer(void *param, void *tmrId) { - SCqObj* pObj = (SCqObj*)param; + SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param); + if (pObj == NULL) { + return; + } + SCqContext* pContext = pObj->pContext; if (pContext->dbConn == NULL) { @@ -281,6 +377,8 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { cqCreateStream(pContext, pObj); pthread_mutex_unlock(&pContext->mutex); } + + taosReleaseRef(cqObjRef, (int64_t)param); } static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { @@ -288,13 +386,13 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { if (pContext->dbConn == NULL) { cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId); - pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl); + pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl); return; } pObj->tmrId = 0; if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, (void *)pObj->rid, NULL); // TODO the pObj->pStream may be released if error happens if (pObj->pStream) { @@ -308,18 +406,28 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { } static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { - SCqObj *pObj = (SCqObj *)param; + SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param); + if (pObj == NULL) { + return; + } + if (tres == NULL && row == NULL) { taos_close_stream(pObj->pStream); pObj->pStream = NULL; + + taosReleaseRef(cqObjRef, (int64_t)param); + return; } SCqContext *pContext = pObj->pContext; STSchema *pSchema = pObj->pSchema; - if (pObj->pStream == NULL) return; - + if (pObj->pStream == NULL) { + taosReleaseRef(cqObjRef, (int64_t)param); + return; + } + cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize; @@ -370,5 +478,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { // write into vnode write queue pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL); free(buffer); + + taosReleaseRef(cqObjRef, (int64_t)param); }