fix:use vgstatus before if rebalance

This commit is contained in:
wangmm0220 2023-09-01 10:05:46 +08:00
parent 89d8c76876
commit b8f041f5ea
1 changed files with 31 additions and 66 deletions

View File

@ -197,10 +197,7 @@ typedef struct {
typedef struct {
int64_t refId;
int32_t epoch;
char topicName[TSDB_TOPIC_FNAME_LEN];
// SMqClientVg* pVg;
// SMqClientTopic* pTopic;
int32_t vgId;
uint64_t requestId; // request id for debug purpose
} SMqPollCbParam;
@ -1313,52 +1310,37 @@ static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
int64_t refId = pParam->refId;
// SMqClientVg* pVg = pParam->pVg;
// SMqClientTopic* pTopic = pParam->pTopic;
int64_t refId = pParam->refId;
int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) {
taosMemoryFree(pParam);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
goto FAILED;
}
int32_t epoch = pParam->epoch;
int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId;
if (code != 0) {
if (pMsg->pData) taosMemoryFree(pMsg->pData);
if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);
// in case of consumer mismatch, wait for 500ms and retry
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
// taosMsleep(500);
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER",
tmq->consumerId);
} else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
tmq->consumerId, vgId, epoch, requestId);
goto CREATE_MSG_FAIL;
tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64,
tmq->consumerId, vgId, requestId);
goto FAILED;
}
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
taosWriteQitem(tmq->mqueue, pRspWrapper);
// } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert
// taosMsleep(5);
} else{
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
vgId, epoch, tstrerror(code), requestId);
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s, reqId:0x%" PRIx64, tmq->consumerId,
vgId, tstrerror(code), requestId);
}
goto CREATE_MSG_FAIL;
goto FAILED;
}
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
@ -1368,43 +1350,27 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tscWarn("consumer:0x%" PRIx64
" msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return 0;
goto FAILED;
}
if (msgEpoch != clientEpoch) {
tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
}
ASSERT(msgEpoch == clientEpoch);
// handle meta rsp
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId,
epoch);
goto CREATE_MSG_FAIL;
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
goto FAILED;
}
pRspWrapper->tmqRspType = rspType;
// pRspWrapper->vgHandle = pVg;
// pRspWrapper->topicHandle = pTopic;
pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet;
pMsg->pEpSet = NULL;
pRspWrapper->vgId = vgId;
strcpy(pRspWrapper->topicName, pParam->topicName);
pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
@ -1432,7 +1398,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
}
taosMemoryFree(pMsg->pData);
taosWriteQitem(tmq->mqueue, pRspWrapper);
int32_t total = taosQueueItemSize(tmq->mqueue);
@ -1442,22 +1407,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(pParam);
taosMemoryFreeClear(pMsg->pData);
return 0;
CREATE_MSG_FAIL:
if (epoch == tmq->epoch) {
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
if(pVg){
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
taosWUnLockLatch(&tmq->lock);
FAILED:
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
if(pVg){
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
taosWUnLockLatch(&tmq->lock);
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(pParam);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
return -1;
}
@ -1467,6 +1433,7 @@ typedef struct SVgroupSaveInfo {
STqOffsetVal commitOffset;
STqOffsetVal seekOffset;
int64_t numOfRows;
int32_t vgStatus;
} SVgroupSaveInfo;
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
@ -1475,7 +1442,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
pTopicEp->schema.nCols = 0;
pTopicEp->schema.pSchema = NULL;
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
@ -1497,7 +1464,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.pollCnt = 0,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE,
.vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
.vgSkipCnt = 0,
.emptyBlockReceiveTs = 0,
.numOfRows = pInfo ? pInfo->numOfRows : 0,
@ -1509,7 +1476,6 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
clientVg.offsetInfo.walVerBegin = -1;
clientVg.offsetInfo.walVerEnd = -1;
clientVg.seekUpdated = false;
// clientVg.receivedInfoFromVnode = false;
taosArrayPush(pTopic->vgs, &clientVg);
}
@ -1565,7 +1531,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
vgKey, buf);
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset,
.commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows,
.vgStatus = pVgCur->vgStatus};
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
}
}
@ -1766,9 +1734,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
}
pParam->refId = pTmq->refId;
pParam->epoch = pTmq->epoch;
// pParam->pVg = pVg; // pVg may be released,fix it
// pParam->pTopic = pTopic;
strcpy(pParam->topicName, pTopic->topicName);
pParam->vgId = pVg->vgId;
pParam->requestId = req.reqId;