diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index b0fae1083f..aba127c36e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -586,30 +586,32 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq if(code != 0){ goto end; } - if (offsetVal->type > 0 && !tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) { - char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); - - char commitBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0); - if (pParamSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet); - if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", - tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno)); - taosMemoryFree(pParamSet); - goto end; - } - - tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", - tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); - pVg->offsetInfo.committedOffset = *offsetVal; + if (offsetVal->type <= 0 || tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) { + code = TSDB_CODE_TMQ_INVALID_MSG; + goto end; } + char offsetBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); + + char commitBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); + + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0); + if (pParamSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet); + if (code != TSDB_CODE_SUCCESS) { + tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", + tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno)); + taosMemoryFree(pParamSet); + goto end; + } + + tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", + tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); + pVg->offsetInfo.committedOffset = *offsetVal; end: taosRUnLockLatch(&tmq->lock);