From f7291504979a49a89254fb45e5f77d3f50d651f6 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 13 Sep 2023 18:32:45 +0800 Subject: [PATCH] fix:opti poll callbak logic & send epoch as -1 to get epset if vnode transformed --- include/common/tcommon.h | 4 +- source/client/src/clientTmq.c | 152 +++++++++------------- source/dnode/mnode/impl/src/mndConsumer.c | 9 ++ 3 files changed, 70 insertions(+), 95 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 8482ba8a78..6f4f15d1e8 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -135,13 +135,11 @@ static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, } enum { - TMQ_MSG_TYPE__DUMMY = 0, - TMQ_MSG_TYPE__POLL_DATA_RSP, + TMQ_MSG_TYPE__POLL_DATA_RSP = 0, TMQ_MSG_TYPE__POLL_META_RSP, TMQ_MSG_TYPE__EP_RSP, TMQ_MSG_TYPE__POLL_DATA_META_RSP, TMQ_MSG_TYPE__WALINFO_RSP, - TMQ_MSG_TYPE__END_RSP, }; enum { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d804a83d51..bede846762 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -40,11 +40,13 @@ volatile int32_t tmqInitRes = 0; // initialize rsp code static struct SMqMgmt tmqMgmt = {0}; typedef struct { + int32_t code; int8_t tmqRspType; int32_t epoch; } SMqRspWrapper; typedef struct { + int32_t code; int8_t tmqRspType; int32_t epoch; SMqAskEpRsp msg; @@ -131,7 +133,6 @@ enum { enum { TMQ_DELAYED_TASK__ASK_EP = 1, - TMQ_DELAYED_TASK__REPORT, TMQ_DELAYED_TASK__COMMIT, }; @@ -163,6 +164,7 @@ typedef struct { } SMqClientTopic; typedef struct { + int32_t code; int8_t tmqRspType; int32_t epoch; // epoch can be used to guard the vgHandle int32_t vgId; @@ -255,7 +257,7 @@ static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet); static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); -static void askEp(tmq_t* pTmq, void* param, bool sync); +static void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset); tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); @@ -845,7 +847,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { while (pTaskType != NULL) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { - askEp(pTmq, NULL, false); + askEp(pTmq, NULL, false, false); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; @@ -862,7 +864,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); - } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { + } else { + tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } taosFreeQitem(pTaskType); @@ -874,9 +877,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { } static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { - if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { - // do nothing - } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; tDeleteSMqAskEpRsp(&pEpRspWrapper->msg); } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { @@ -1300,33 +1301,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { uint64_t requestId = pParam->requestId; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - taosMemoryFree(pParam); - taosMemoryFreeClear(pMsg->pData); - taosMemoryFreeClear(pMsg->pEpSet); - terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; - return -1; + code = TSDB_CODE_TMQ_CONSUMER_CLOSED; + goto FAIL; } - if (code != 0) { - if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); - tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", - tmq->consumerId); - } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); - if (pRspWrapper == NULL) { - tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64, - tmq->consumerId, vgId, requestId); - goto END; - } - - pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; - taosWriteQitem(tmq->mqueue, pRspWrapper); - } else{ - tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s, reqId:0x%" PRIx64, tmq->consumerId, - vgId, tstrerror(code), requestId); - } + SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); + if (pRspWrapper == NULL) { + tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); + taosReleaseRef(tmqMgmt.rsetId, refId); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + if(code != 0){ goto END; } @@ -1335,30 +1322,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (msgEpoch < clientEpoch) { // do not write into queue since updating epoch reset tscWarn("consumer:0x%" PRIx64 - " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, + " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); code = -1; goto END; } ASSERT(msgEpoch == clientEpoch); - // handle meta rsp int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; - - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); - if (pRspWrapper == NULL) { - tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); - code = TSDB_CODE_OUT_OF_MEMORY; - goto END; - } - pRspWrapper->tmqRspType = rspType; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; pMsg->pEpSet = NULL; - pRspWrapper->vgId = vgId; - strcpy(pRspWrapper->topicName, pParam->topicName); if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SDecoder decoder; @@ -1387,22 +1363,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); } +END: + pRspWrapper->code = code; + pRspWrapper->vgId = vgId; + strcpy(pRspWrapper->topicName, pParam->topicName); taosWriteQitem(tmq->mqueue, pRspWrapper); int32_t total = taosQueueItemSize(tmq->mqueue); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId, rspType, vgId, total, requestId); - -END: - if(code != 0){ - setVgIdle(tmq, pParam->topicName, vgId); - } - - tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); + +FAIL: + tsem_post(&tmq->rspSem); taosMemoryFree(pParam); - taosMemoryFreeClear(pMsg->pData); - taosMemoryFreeClear(pMsg->pEpSet); + if(pMsg) taosMemoryFreeClear(pMsg->pData); + if(pMsg) taosMemoryFreeClear(pMsg->pEpSet); return code; } @@ -1473,7 +1449,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) bool set = false; int32_t topicNumGet = taosArrayGetSize(pRsp->topics); - if (epoch <= tmq->epoch) { + if (topicNumGet <= 0) { + tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", + tmq->consumerId, tmq->epoch, epoch, topicNumGet); return false; } @@ -1675,10 +1653,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); - - tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, - pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, + pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); if(code != 0){ goto FAIL; } @@ -1688,11 +1665,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pTmq->pollCnt++; return 0; - FAIL: - taosMemoryFreeClear(pParam); taosMemoryFreeClear(msg); - return code; + return tmqPollCb(pParam, NULL, code); } // broadcast the poll request to all related vnodes @@ -1729,8 +1704,6 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { atomic_store_32(&pVg->vgSkipCnt, 0); code = doTmqPollImpl(tmq, pTopic, pVg, timeout); if (code != TSDB_CODE_SUCCESS) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - tsem_post(&tmq->rspSem); goto end; } } @@ -1742,22 +1715,6 @@ end: return code; } -static int32_t tmqHandleEpRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper) { - if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { - if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { - SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; - SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; - doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg); - tDeleteSMqAskEpRsp(rspMsg); - } else { - tmqFreeRspWrapper(rspWrapper); - } - } else { - return -1; - } - return 0; -} - static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){ if (!pVg->seekUpdated) { tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); @@ -1792,11 +1749,28 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType); - if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { + if (pRspWrapper->code != 0) { + SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; + if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); + tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", tmq->consumerId); + } else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { + taosFreeQitem(pRspWrapper); + terrno = pRspWrapper->code; + tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(pRspWrapper->code)); + return NULL; + } else{ + if(pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID){ // for vnode transform + askEp(tmq, NULL, false, true); + } + tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code)); + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + pVg->emptyBlockReceiveTs = taosGetTimestampMs(); + taosWUnLockLatch(&tmq->lock); + } + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); taosFreeQitem(pRspWrapper); - terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; - tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno)); - return NULL; } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; @@ -1834,6 +1808,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pVg->emptyBlockReceiveTs = taosGetTimestampMs(); tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); + taosWUnLockLatch(&tmq->lock); } else { // build rsp int64_t numOfRows = 0; SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); @@ -1847,7 +1822,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { taosWUnLockLatch(&tmq->lock); return pRsp; } - taosWUnLockLatch(&tmq->lock); } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); @@ -2379,13 +2353,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); - if (head->epoch <= epoch) { - tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep", - tmq->consumerId, head->epoch, epoch); - goto END; - } - - tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, + tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); if(pParam->sync){ SMqAskEpRsp rsp = {0}; @@ -2427,7 +2395,7 @@ int32_t syncAskEp(tmq_t* pTmq) { SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); tsem_init(&pInfo->sem, 0, 0); - askEp(pTmq, pInfo, true); + askEp(pTmq, pInfo, true, false); tsem_wait(&pInfo->sem); int32_t code = pInfo->code; @@ -2436,10 +2404,10 @@ int32_t syncAskEp(tmq_t* pTmq) { return code; } -void askEp(tmq_t* pTmq, void* param, bool sync) { +void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { SMqAskEpReq req = {0}; req.consumerId = pTmq->consumerId; - req.epoch = pTmq->epoch; + req.epoch = updateEpSet ? -1 :pTmq->epoch; strcpy(req.cgroup, pTmq->groupId); int code = 0; SMqAskEpCbParam* pParam = NULL; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7f96255b1e..4aa4a4ddf2 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mndConsumer.h" #include "mndPrivilege.h" +#include "mndVgroup.h" #include "mndShow.h" #include "mndSubscribe.h" #include "mndTopic.h" @@ -542,6 +543,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); char offsetKey[TSDB_PARTITION_KEY_LEN]; mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId); + + if(epoch == -1){ + SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); + if(pVgroup){ + pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup); + mndReleaseVgroup(pMnode, pVgroup); + } + } // 2.2.1 build vg ep SMqSubVgEp vgEp = { .epSet = pVgEp->epSet,