fix:[TD-30306]error in converity scan

This commit is contained in:
wangmm0220 2024-06-03 16:48:44 +08:00
parent d85d3aa70e
commit 5217131f50
1 changed files with 9 additions and 11 deletions

View File

@ -1344,12 +1344,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
goto FAIL;
}
if(pMsg->pData == NULL){
tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto FAIL;
}
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
@ -1362,6 +1356,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
goto END;
}
if(pMsg->pData == NULL){
tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto END;
}
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
int32_t clientEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < clientEpoch) {
@ -1388,7 +1388,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
goto END;
}
tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
@ -1404,7 +1404,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
goto END;
}
tDecoderClear(&decoder);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
@ -1415,7 +1415,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
goto END;
}
tDecoderClear(&decoder);
memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
@ -1889,8 +1889,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
terrno = pRspWrapper->code;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId,
tstrerror(pRspWrapper->code));
taosFreeQitem(pRspWrapper);
return NULL;
} else {
if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform
askEp(tmq, NULL, false, true);