From f06bc2467d38a5521faace36c863e557e03348ed Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 25 Jul 2024 14:00:03 +0800 Subject: [PATCH] fix:[TD-31017]process return value in vnode for tmq --- source/client/src/clientTmq.c | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 650d262870..fc6ce25c08 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -583,16 +583,10 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq goto end; } char offsetBuf[TSDB_OFFSET_LEN] = {0}; - code = tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); - if (code != 0) { - goto end; - } + tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); char commitBuf[TSDB_OFFSET_LEN] = {0}; - code = tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - if (code != 0) { - goto end; - } + tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); SMqCommitCbParamSet* pParamSet = NULL; code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet); @@ -694,10 +688,10 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us if (pVg->offsetInfo.endOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) { char offsetBuf[TSDB_OFFSET_LEN] = {0}; - (void)tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset); + tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset); char commitBuf[TSDB_OFFSET_LEN] = {0}; - (void)tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); + tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet); if (code != TSDB_CODE_SUCCESS) { @@ -865,7 +859,7 @@ void tmqSendHbReq(void* param, void* tmrId) { offRows->offset = pVg->offsetInfo.endOffset; offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd; char buf[TSDB_OFFSET_LEN] = {0}; - (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tscDebug("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64, tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows); } @@ -1258,7 +1252,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { } char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; - (void)tFormatOffset(buf, tListLen(buf), &offset); + tFormatOffset(buf, tListLen(buf), &offset); tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64 ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s", pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, @@ -1516,7 +1510,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { (void)memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); char buf[TSDB_OFFSET_LEN] = {0}; - (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.common.rspOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.common.rspOffset); tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, pRspWrapper->dataRsp.common.reqOffset.version, buf, rspType, requestId); } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { @@ -1694,7 +1688,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) (void)sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); char buf[TSDB_OFFSET_LEN] = {0}; - (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset); tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); @@ -1958,7 +1952,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; - (void)tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); + tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); @@ -2129,7 +2123,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tmq->consumerId, pDataRsp->blockNum != 0); char buf[TSDB_OFFSET_LEN] = {0}; - (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); if (pDataRsp->blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, @@ -2286,7 +2280,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tmq->totalRows += numOfRows; char buf[TSDB_OFFSET_LEN] = {0}; - (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, @@ -3400,7 +3394,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; - (void)tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); + tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);