From b8f041f5ea6ae6b9594b730cccfbac55e9fdda7e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 1 Sep 2023 10:05:46 +0800 Subject: [PATCH] fix:use vgstatus before if rebalance --- source/client/src/clientTmq.c | 97 +++++++++++------------------------ 1 file changed, 31 insertions(+), 66 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e861bd4b92..c6755f0bba 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -197,10 +197,7 @@ typedef struct { typedef struct { int64_t refId; - int32_t epoch; char topicName[TSDB_TOPIC_FNAME_LEN]; -// SMqClientVg* pVg; -// SMqClientTopic* pTopic; int32_t vgId; uint64_t requestId; // request id for debug purpose } SMqPollCbParam; @@ -1313,52 +1310,37 @@ static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SMqPollCbParam* pParam = (SMqPollCbParam*)param; - - int64_t refId = pParam->refId; -// SMqClientVg* pVg = pParam->pVg; -// SMqClientTopic* pTopic = pParam->pTopic; - + int64_t refId = pParam->refId; + int32_t vgId = pParam->vgId; + uint64_t requestId = pParam->requestId; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - taosMemoryFree(pParam); - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; - return -1; + goto FAILED; } - int32_t epoch = pParam->epoch; - int32_t vgId = pParam->vgId; - uint64_t requestId = pParam->requestId; - if (code != 0) { - if (pMsg->pData) taosMemoryFree(pMsg->pData); - if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet); - // in case of consumer mismatch, wait for 500ms and retry if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { -// taosMsleep(500); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); - tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER", + tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", tmq->consumerId); } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { - tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64, - tmq->consumerId, vgId, epoch, requestId); - goto CREATE_MSG_FAIL; + tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64, + tmq->consumerId, vgId, requestId); + goto FAILED; } pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; taosWriteQitem(tmq->mqueue, pRspWrapper); -// } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert -// taosMsleep(5); } else{ - tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId, - vgId, epoch, tstrerror(code), requestId); + tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s, reqId:0x%" PRIx64, tmq->consumerId, + vgId, tstrerror(code), requestId); } - goto CREATE_MSG_FAIL; + goto FAILED; } int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; @@ -1368,43 +1350,27 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); - - tsem_post(&tmq->rspSem); - taosReleaseRef(tmqMgmt.rsetId, refId); - - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pParam); - - return 0; + goto FAILED; } - if (msgEpoch != clientEpoch) { - tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64, - tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); - } + ASSERT(msgEpoch == clientEpoch); // handle meta rsp int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId, - epoch); - goto CREATE_MSG_FAIL; + tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); + goto FAILED; } pRspWrapper->tmqRspType = rspType; -// pRspWrapper->vgHandle = pVg; -// pRspWrapper->topicHandle = pTopic; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; + pMsg->pEpSet = NULL; pRspWrapper->vgId = vgId; strcpy(pRspWrapper->topicName, pParam->topicName); - pMsg->pEpSet = NULL; if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); @@ -1432,7 +1398,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); } - taosMemoryFree(pMsg->pData); taosWriteQitem(tmq->mqueue, pRspWrapper); int32_t total = taosQueueItemSize(tmq->mqueue); @@ -1442,22 +1407,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); taosMemoryFree(pParam); + taosMemoryFreeClear(pMsg->pData); return 0; -CREATE_MSG_FAIL: - if (epoch == tmq->epoch) { - taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId); - if(pVg){ - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - } - taosWUnLockLatch(&tmq->lock); +FAILED: + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId); + if(pVg){ + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } + taosWUnLockLatch(&tmq->lock); tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); taosMemoryFree(pParam); + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); return -1; } @@ -1467,6 +1433,7 @@ typedef struct SVgroupSaveInfo { STqOffsetVal commitOffset; STqOffsetVal seekOffset; int64_t numOfRows; + int32_t vgStatus; } SVgroupSaveInfo; static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap, @@ -1475,7 +1442,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic pTopicEp->schema.nCols = 0; pTopicEp->schema.pSchema = NULL; - char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0}; int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); @@ -1497,7 +1464,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .pollCnt = 0, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, - .vgStatus = TMQ_VG_STATUS__IDLE, + .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, .numOfRows = pInfo ? pInfo->numOfRows : 0, @@ -1509,7 +1476,6 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; clientVg.seekUpdated = false; -// clientVg.receivedInfoFromVnode = false; taosArrayPush(pTopic->vgs, &clientVg); } @@ -1565,7 +1531,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; + SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset, + .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows, + .vgStatus = pVgCur->vgStatus}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } @@ -1766,9 +1734,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p } pParam->refId = pTmq->refId; - pParam->epoch = pTmq->epoch; -// pParam->pVg = pVg; // pVg may be released,fix it -// pParam->pTopic = pTopic; strcpy(pParam->topicName, pTopic->topicName); pParam->vgId = pVg->vgId; pParam->requestId = req.reqId;