fix(tmq): fix the invalid read.

This commit is contained in:
Haojun Liao 2023-03-10 10:27:21 +08:00
parent a4e378e138
commit b31b7c3f64
1 changed files with 6 additions and 3 deletions

View File

@ -1289,9 +1289,11 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req offset:%" PRId64 ", rsp offset:%" PRId64 " type %d, reqId:0x%"PRIx64,
tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
rspType, requestId);
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%" PRId64
" type %d, reqId:0x%" PRIx64,
tmq->consumerId, pParam->vgId, pRspWrapper->dataRsp.reqOffset.version,
pRspWrapper->dataRsp.rspOffset.version, rspType, requestId);
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
@ -1378,6 +1380,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgNumGet; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);