fix: mem leak
This commit is contained in:
parent
5b21a2394c
commit
332ecd1d92
|
@ -875,6 +875,7 @@ void tmqFreeImpl(void* handle) {
|
||||||
tmq_t* tmq = (tmq_t*)handle;
|
tmq_t* tmq = (tmq_t*)handle;
|
||||||
|
|
||||||
// TODO stop timer
|
// TODO stop timer
|
||||||
|
tmqClearUnhandleMsg(tmq);
|
||||||
if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
|
if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
|
||||||
if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
|
if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
|
||||||
if (tmq->qall) taosFreeQall(tmq->qall);
|
if (tmq->qall) taosFreeQall(tmq->qall);
|
||||||
|
@ -884,8 +885,7 @@ void tmqFreeImpl(void* handle) {
|
||||||
int32_t sz = taosArrayGetSize(tmq->clientTopics);
|
int32_t sz = taosArrayGetSize(tmq->clientTopics);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||||
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
|
|
||||||
taosArrayDestroy(pTopic->vgs);
|
taosArrayDestroy(pTopic->vgs);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(tmq->clientTopics);
|
taosArrayDestroy(tmq->clientTopics);
|
||||||
|
@ -1304,7 +1304,6 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||||
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
|
|
||||||
taosArrayDestroy(pTopic->vgs);
|
taosArrayDestroy(pTopic->vgs);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(tmq->clientTopics);
|
taosArrayDestroy(tmq->clientTopics);
|
||||||
|
@ -1410,7 +1409,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
void* pReq = taosMemoryCalloc(1, tlen);
|
void* pReq = taosMemoryCalloc(1, tlen);
|
||||||
if (tlen < 0) {
|
if (pReq == NULL) {
|
||||||
tscError("failed to malloc askEpReq msg, size:%d", tlen);
|
tscError("failed to malloc askEpReq msg, size:%d", tlen);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1738,7 +1737,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
|
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue