fix(tmq): invalid free
This commit is contained in:
parent
72d7cc5edf
commit
0fd4f3b519
|
@ -299,10 +299,21 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
||||||
dataRsp.rspOffset = fetchOffsetNew;
|
dataRsp.rspOffset = fetchOffsetNew;
|
||||||
|
code = 0;
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
goto OVER;
|
taosArrayDestroy(dataRsp.blockDataLen);
|
||||||
|
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
|
||||||
|
|
||||||
|
if (dataRsp.withSchema) {
|
||||||
|
taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dataRsp.withTbName) {
|
||||||
|
taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||||
tqError("tmq poll: subkey %s, no offset committed for consumer %ld in vg %d, subkey %s, reset none failed",
|
tqError("tmq poll: subkey %s, no offset committed for consumer %ld in vg %d, subkey %s, reset none failed",
|
||||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
|
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
|
||||||
|
|
Loading…
Reference in New Issue