From af423685ca1c6d2f15163b9d73ff78c9e6e88446 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Mar 2023 17:42:01 +0800 Subject: [PATCH] fix(tmq): handle the commit failure. --- source/client/src/clientTmq.c | 137 ++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 46 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0d3d37dc29..14d056224a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -208,6 +208,8 @@ typedef struct { static int32_t tmqAskEp(tmq_t* tmq, bool async); static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); +static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet, + int32_t index, int32_t totalVgroups); static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); tmq_conf_t* tmq_conf_new() { @@ -378,51 +380,90 @@ char** tmq_list_to_c_array(const tmq_list_t* list) { return container->pData; } -static void updateVgEpset(tmq_t* pTmq, SMqCommitCbParam* pParam, SEpSet* pEpSet) { - int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics); +static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index, int32_t* numOfVgroups) { + int32_t numOfTopics = taosArrayGetSize(pTopicList); + *index = -1; + *numOfVgroups = 0; + for(int32_t i = 0; i < numOfTopics; ++i) { - SMqClientTopic* pTopic = taosArrayGet(pTmq->clientTopics, i); - if (strcmp(pTopic->topicName, pParam->topicName) != 0) { + SMqClientTopic* pTopic = taosArrayGet(pTopicList, i); + if (strcmp(pTopic->topicName, pName) != 0) { continue; } - int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); - for(int32_t j = 0; j < numOfVgs; ++j) { + *numOfVgroups = taosArrayGetSize(pTopic->vgs); + for (int32_t j = 0; j < (*numOfVgroups); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (pClientVg->vgId == pParam->vgId) { - SEp* pEp = GET_ACTIVE_EP(pEpSet); - SEp* pOld = GET_ACTIVE_EP(&(pClientVg->epSet)); - uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pParam->vgId, - pEp->fqdn, pEp->port, pOld->fqdn, pOld->port); - pClientVg->epSet = *pEpSet; - break; + if (pClientVg->vgId == vgId) { + *index = j; + return pClientVg; } } - - break; } + + return NULL; } -// todo retry to send the commit if failed + static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; - // push into array -#if 0 - if (code == 0) { - taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset); - } else { - taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset); - } -#endif - - // update the epset if needed - if (pBuf->pEpSet != NULL) { + if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again taosThreadMutexLock(&pParam->pTmq->lock); - updateVgEpset(pParam->pTmq, pParam, pBuf->pEpSet); + int32_t numOfVgroups, index; + SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups); + + if (pVg == NULL) { + tscDebug("consumer:0x%" PRIx64 + " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d", + pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups); + } else { // let's retry the commit + int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups); + if (code1 != TSDB_CODE_SUCCESS) { // retry failed. + tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 + " retry failed, ignore this commit. code:%s ordinal:%d/%d", + pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version, + tstrerror(terrno), index + 1, numOfVgroups); + } + } + taosThreadMutexUnlock(&pParam->pTmq->lock); + + taosMemoryFree(pParam->pOffset); + taosMemoryFree(pBuf->pData); + taosMemoryFree(pBuf->pEpSet); + + tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); + return 0; } + // todo replace the pTmq with refId + taosThreadMutexLock(&pParam->pTmq->lock); + tmq_t* pTmq = pParam->pTmq; + int32_t index = 0, numOfVgroups = 0; + + SMqClientVg* pVg = foundClientVg(pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups); + if (pVg == NULL) { + tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d has been transferred to other consumer, ordinal:%d/%d", + pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups); + } else { // update the epset if needed + if (pBuf->pEpSet != NULL) { + SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet); + SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet)); + + tscDebug("consumer:0x%" PRIx64 " subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d, ordinal:%d/%d", + pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port, + index + 1, numOfVgroups); + + pVg->epSet = *pBuf->pEpSet; + } + + // update the offset value. + pVg->committedOffset = pVg->currentOffset; + } + + taosThreadMutexUnlock(&pParam->pTmq->lock); + taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); @@ -431,7 +472,8 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { return 0; } -static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet) { +static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet, + int32_t index, int32_t totalVgroups) { STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); if (pOffset == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -498,11 +540,9 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, const char* pTopic }; SEp* pEp = GET_ACTIVE_EP(&pVg->epSet); - tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64 " prev:%" PRId64 ", ep:%s:%d", tmq->consumerId, - pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port); - - // TODO: put into cb, the commit offset should be move to the callback function - pVg->committedOffset = pVg->currentOffset; + tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64 " prev:%" PRId64 ", ep:%s:%d, ordinal:%d/%d", + tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, + pEp->port, index + 1, totalVgroups); pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestObjRefId = 0; @@ -557,15 +597,19 @@ static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, t taosThreadMutexLock(&tmq->lock); for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - if (strcmp(pTopic->topicName, topic) != 0) continue; - for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + if (strcmp(pTopic->topicName, topic) != 0) { + continue; + } + + int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); + for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (pVg->vgId != vgId) { continue; } if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { - if (tmqSendCommitReq(tmq, pVg, pTopic->topicName, pParamSet) < 0) { + if (doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups) < 0) { tsem_destroy(&pParamSet->rspSem); taosMemoryFree(pParamSet); goto FAIL; @@ -634,7 +678,6 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, taosThreadMutexLock(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); for (int32_t i = 0; i < numOfTopics; i++) { @@ -644,14 +687,19 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups); for (int32_t j = 0; j < numOfVgroups; j++) { - SMqClientVg clientVg = *(SMqClientVg*)taosArrayGet(pTopic->vgs, j); - if (clientVg.currentOffset.type > 0 && !tOffsetEqual(&clientVg.currentOffset, &clientVg.committedOffset)) { - if (tmqSendCommitReq(tmq, &clientVg, pTopic->topicName, pParamSet) < 0) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + + if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { + code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups); + if (code != TSDB_CODE_SUCCESS) { + tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d", + tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->committedOffset.version, tstrerror(terrno), + j + 1, numOfVgroups); continue; } } else { - tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d", - tmq->consumerId, pTopic->topicName, clientVg.vgId, clientVg.currentOffset.version, j + 1, numOfVgroups); + tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", + tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups); } } } @@ -1804,7 +1852,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); } } else { - /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ bool reset = false; tmqHandleNoPollRsp(tmq, rspWrapper, &reset); taosFreeQitem(rspWrapper); @@ -1814,8 +1861,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } } } - - tscDebug("consumer:0x%" PRIx64 " handle the rsp completed", tmq->consumerId); } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {