fix:[TD-31017]process return value in vnode for tmq

This commit is contained in:
wangmm0220 2024-07-25 14:00:03 +08:00
parent 261d15d765
commit f06bc2467d
1 changed files with 12 additions and 18 deletions

View File

@ -583,16 +583,10 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq
goto end; goto end;
} }
char offsetBuf[TSDB_OFFSET_LEN] = {0}; char offsetBuf[TSDB_OFFSET_LEN] = {0};
code = tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
if (code != 0) {
goto end;
}
char commitBuf[TSDB_OFFSET_LEN] = {0}; char commitBuf[TSDB_OFFSET_LEN] = {0};
code = tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
if (code != 0) {
goto end;
}
SMqCommitCbParamSet* pParamSet = NULL; SMqCommitCbParamSet* pParamSet = NULL;
code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet); code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
@ -694,10 +688,10 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
if (pVg->offsetInfo.endOffset.type > 0 && if (pVg->offsetInfo.endOffset.type > 0 &&
!tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) { !tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) {
char offsetBuf[TSDB_OFFSET_LEN] = {0}; char offsetBuf[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset); tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset);
char commitBuf[TSDB_OFFSET_LEN] = {0}; char commitBuf[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet); code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -865,7 +859,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
offRows->offset = pVg->offsetInfo.endOffset; offRows->offset = pVg->offsetInfo.endOffset;
offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd; offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscDebug("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64, tscDebug("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows); tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
} }
@ -1258,7 +1252,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
} }
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
(void)tFormatOffset(buf, tListLen(buf), &offset); tFormatOffset(buf, tListLen(buf), &offset);
tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64 tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s", ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
@ -1516,7 +1510,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
(void)memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); (void)memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.common.rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.common.rspOffset);
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, pRspWrapper->dataRsp.common.reqOffset.version, buf, rspType, requestId); tmq->consumerId, vgId, pRspWrapper->dataRsp.common.reqOffset.version, buf, rspType, requestId);
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
@ -1694,7 +1688,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
(void)sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); (void)sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
vgKey, buf); vgKey, buf);
@ -1958,7 +1952,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
int64_t transporterId = 0; int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
@ -2129,7 +2123,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tmq->consumerId, pDataRsp->blockNum != 0); tmq->consumerId, pDataRsp->blockNum != 0);
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
if (pDataRsp->blockNum == 0) { if (pDataRsp->blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
", total:%" PRId64 ", reqId:0x%" PRIx64, ", total:%" PRId64 ", reqId:0x%" PRIx64,
@ -2286,7 +2280,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tmq->totalRows += numOfRows; tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
@ -3400,7 +3394,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
int64_t transporterId = 0; int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);