diff --git a/example/src/tmq.c b/example/src/tmq.c index 338399232c..913096ee90 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -171,6 +171,7 @@ tmq_t* build_consumer() { tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq); + tmq_conf_destroy(conf); return tmq; } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bd3ca6ae8d..df66174c9b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2523,11 +2523,9 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); buf = taosDecodeFixedI32(buf, &pRsp->blockNum); - pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); - pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t)); - pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); - pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); if (pRsp->blockNum != 0) { + pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); + pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t)); buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withSchema); buf = taosDecodeFixedI8(buf, &pRsp->withTag); @@ -2540,14 +2538,20 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p taosArrayPush(pRsp->blockDataLen, &bLen); taosArrayPush(pRsp->blockData, &data); if (pRsp->withSchema) { + pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); buf = taosDecodeSSchemaWrapper(buf, pSW); taosArrayPush(pRsp->blockSchema, &pSW); + } else { + pRsp->blockSchema = NULL; } if (pRsp->withTbName) { + pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); char* name = NULL; buf = taosDecodeString(buf, &name); taosArrayPush(pRsp->blockTbName, &name); + } else { + pRsp->blockTbName = NULL; } } } diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index c4dc98354e..669b2bc97e 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -60,7 +60,7 @@ static void registerRequest(SRequestObj *pRequest) { static void deregisterRequest(SRequestObj *pRequest) { assert(pRequest != NULL); - STscObj * pTscObj = pRequest->pTscObj; + STscObj *pTscObj = pRequest->pTscObj; SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary; int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); @@ -313,7 +313,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { return 0; } - SConfig * pCfg = taosGetCfg(); + SConfig *pCfg = taosGetCfg(); SConfigItem *pItem = NULL; switch (option) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 386283b5b5..d1a3511252 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -390,7 +390,7 @@ void freeRequestRes(SRequestObj* pRequest, void* res) { if (TDMT_VND_SUBMIT == pRequest->type) { tFreeSSubmitRsp((SSubmitRsp*)res); } else if (TDMT_VND_QUERY == pRequest->type) { - taosArrayDestroy((SArray *)res); + taosArrayDestroy((SArray*)res); } } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 7360b054e2..eec8cab7ad 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -146,10 +146,10 @@ void taos_free_result(TAOS_RES *res) { SMqRspObj *pRsp = (SMqRspObj *)res; if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); - if (pRsp->rsp.blockSchema) taosArrayDestroy(pRsp->rsp.blockSchema); - if (pRsp->rsp.blockTbName) taosArrayDestroy(pRsp->rsp.blockTbName); if (pRsp->rsp.blockTags) taosArrayDestroy(pRsp->rsp.blockTags); if (pRsp->rsp.blockTagSchema) taosArrayDestroy(pRsp->rsp.blockTagSchema); + if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); + if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); pRsp->resInfo.pRspMsg = NULL; doFreeReqResultInfo(&pRsp->resInfo); } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 36c0d8156c..dfa56f80c4 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -202,7 +202,12 @@ tmq_conf_t* tmq_conf_new() { } void tmq_conf_destroy(tmq_conf_t* conf) { - if (conf) taosMemoryFree(conf); + if (conf) { + if (conf->ip) taosMemoryFree(conf->ip); + if (conf->user) taosMemoryFree(conf->user); + if (conf->pass) taosMemoryFree(conf->pass); + taosMemoryFree(conf); + } } tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { @@ -497,6 +502,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { } else { ASSERT(0); } + taosFreeQitem(pTaskType); } taosFreeQall(qall); return 0; @@ -954,8 +960,12 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqClientVg* pVg = pParam->pVg; SMqClientTopic* pTopic = pParam->pTopic; tmq_t* tmq = pParam->tmq; + int32_t vgId = pParam->vgId; + int32_t epoch = pParam->epoch; + taosMemoryFree(pParam); if (code != 0) { - tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code); + tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code); + if (pMsg->pData) taosMemoryFree(pMsg->pData); goto CREATE_MSG_FAIL; } @@ -963,19 +973,21 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqEpoch = atomic_load_32(&tmq->epoch); if (msgEpoch < tmqEpoch) { // do not write into queue since updating epoch reset - tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, + tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch); tsem_post(&tmq->rspSem); + taosMemoryFree(pMsg->pData); return 0; } if (msgEpoch != tmqEpoch) { - tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); + tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch); } SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { - tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); + taosMemoryFree(pMsg->pData); + tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch); goto CREATE_MSG_FAIL; } @@ -986,6 +998,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg); + taosMemoryFree(pMsg->pData); tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId, pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset); @@ -995,7 +1008,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; CREATE_MSG_FAIL: - if (pParam->epoch == tmq->epoch) { + if (epoch == tmq->epoch) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } tsem_post(&tmq->rspSem); @@ -1088,6 +1101,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; + int8_t async = pParam->async; pParam->code = code; if (code != 0) { tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async); @@ -1104,7 +1118,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { goto END; } - if (!pParam->async) { + if (!async) { SMqAskEpRsp rsp; tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ @@ -1125,13 +1139,14 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { taosWriteQitem(tmq->mqueue, pWrapper); tsem_post(&tmq->rspSem); - taosMemoryFree(pParam); } END: /*atomic_store_8(&tmq->epStatus, 0);*/ - if (!pParam->async) { + if (!async) { tsem_post(&pParam->rspSem); + } else { + taosMemoryFree(pParam); } return code; } @@ -1279,7 +1294,6 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); } - taosFreeQitem(pWrapper); return pRspObj; } @@ -1401,6 +1415,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) { } // build rsp SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + taosFreeQitem(pollRspWrapper); return pRsp; } else { /*printf("epoch mismatch\n");*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1f35ec2650..0671044bad 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -57,6 +57,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { void tqClose(STQ* pTq) { if (pTq) { taosMemoryFreeClear(pTq->path); + taosHashCleanup(pTq->execs); + taosHashCleanup(pTq->pStreamTasks); + taosHashCleanup(pTq->pushMgr); taosMemoryFree(pTq); } // TODO @@ -409,9 +412,9 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu pTopic->buffer.output[j].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { - .reader = pReadHandle, - .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, + .reader = pReadHandle, + .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, }; pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); @@ -1000,10 +1003,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { for (int32_t i = 0; i < parallel; i++) { STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { - .reader = pStreamReader, - .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, - .vnode = pTq->pVnode, + .reader = pStreamReader, + .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, + .vnode = pTq->pVnode, }; pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);