Merge pull request #20490 from taosdata/fix/liaohj
fix(tmq): disable non-leader vnode responsing the poll request.
This commit is contained in:
commit
345b46769c
|
@ -24,6 +24,8 @@
|
|||
#include "tref.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
#define EMPTY_BLOCK_POLL_IDLE_DURATION 100
|
||||
|
||||
struct SMqMgmt {
|
||||
int8_t inited;
|
||||
tmr_h timer;
|
||||
|
@ -69,7 +71,6 @@ struct tmq_conf_t {
|
|||
|
||||
struct tmq_t {
|
||||
int64_t refId;
|
||||
// conf
|
||||
char groupId[TSDB_CGROUP_LEN];
|
||||
char clientId[256];
|
||||
int8_t withTbName;
|
||||
|
@ -79,7 +80,6 @@ struct tmq_t {
|
|||
int32_t resetOffsetCfg;
|
||||
uint64_t consumerId;
|
||||
bool hbBgEnable;
|
||||
|
||||
tmq_commit_cb* commitCb;
|
||||
void* commitCbUserParam;
|
||||
|
||||
|
@ -97,7 +97,6 @@ struct tmq_t {
|
|||
tmr_h epTimer;
|
||||
tmr_h reportTimer;
|
||||
tmr_h commitTimer;
|
||||
|
||||
STscObj* pTscObj; // connection
|
||||
SArray* clientTopics; // SArray<SMqClientTopic>
|
||||
STaosQueue* mqueue; // queue of rsp
|
||||
|
@ -149,6 +148,7 @@ typedef struct {
|
|||
SMqClientVg* vgHandle;
|
||||
SMqClientTopic* topicHandle;
|
||||
uint64_t reqId;
|
||||
SEpSet* pEpset;
|
||||
union {
|
||||
SMqDataRsp dataRsp;
|
||||
SMqMetaRsp metaRsp;
|
||||
|
@ -403,65 +403,42 @@ static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// Two problems do not need to be addressed here
|
||||
// 1. update to of epset. the response of poll request will automatically handle this problem
|
||||
// 2. commit failure. This one needs to be resolved.
|
||||
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
||||
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
|
||||
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
|
||||
taosThreadMutexLock(&pParam->pTmq->lock);
|
||||
int32_t numOfVgroups, index;
|
||||
SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
|
||||
|
||||
if (pVg == NULL) {
|
||||
tscDebug("consumer:0x%" PRIx64
|
||||
" subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d",
|
||||
pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups);
|
||||
} else { // let's retry the commit
|
||||
int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
|
||||
if (code1 != TSDB_CODE_SUCCESS) { // retry failed.
|
||||
tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
|
||||
" retry failed, ignore this commit. code:%s ordinal:%d/%d",
|
||||
pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version,
|
||||
tstrerror(terrno), index + 1, numOfVgroups);
|
||||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pParam->pTmq->lock);
|
||||
|
||||
taosMemoryFree(pParam->pOffset);
|
||||
taosMemoryFree(pBuf->pData);
|
||||
taosMemoryFree(pBuf->pEpSet);
|
||||
|
||||
tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// todo replace the pTmq with refId
|
||||
taosThreadMutexLock(&pParam->pTmq->lock);
|
||||
tmq_t* pTmq = pParam->pTmq;
|
||||
int32_t index = 0, numOfVgroups = 0;
|
||||
|
||||
SMqClientVg* pVg = foundClientVg(pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
|
||||
if (pVg == NULL) {
|
||||
tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d has been transferred to other consumer, ordinal:%d/%d",
|
||||
pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
|
||||
} else { // update the epset if needed
|
||||
if (pBuf->pEpSet != NULL) {
|
||||
SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
|
||||
SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d, ordinal:%d/%d",
|
||||
pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port,
|
||||
index + 1, numOfVgroups);
|
||||
|
||||
pVg->epSet = *pBuf->pEpSet;
|
||||
}
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d, commit offset success. ordinal:%d/%d", pTmq->consumerId,
|
||||
pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pParam->pTmq->lock);
|
||||
// if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
|
||||
// taosThreadMutexLock(&pParam->pTmq->lock);
|
||||
// int32_t numOfVgroups, index;
|
||||
// SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
|
||||
// if (pVg == NULL) {
|
||||
// tscDebug("consumer:0x%" PRIx64
|
||||
// " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d",
|
||||
// pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups);
|
||||
// } else { // let's retry the commit
|
||||
// int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
|
||||
// if (code1 != TSDB_CODE_SUCCESS) { // retry failed.
|
||||
// tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
|
||||
// " retry failed, ignore this commit. code:%s ordinal:%d/%d",
|
||||
// pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version,
|
||||
// tstrerror(terrno), index + 1, numOfVgroups);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// taosThreadMutexUnlock(&pParam->pTmq->lock);
|
||||
//
|
||||
// taosMemoryFree(pParam->pOffset);
|
||||
// taosMemoryFree(pBuf->pData);
|
||||
// taosMemoryFree(pBuf->pEpSet);
|
||||
//
|
||||
// tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
|
||||
// return 0;
|
||||
// }
|
||||
//
|
||||
// // todo replace the pTmq with refId
|
||||
|
||||
taosMemoryFree(pParam->pOffset);
|
||||
taosMemoryFree(pBuf->pData);
|
||||
|
@ -892,15 +869,21 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
|||
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||
taosMemoryFreeClear(pRsp->pEpset);
|
||||
|
||||
taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
|
||||
taosArrayDestroy(pRsp->dataRsp.blockDataLen);
|
||||
taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
|
||||
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||
taosMemoryFreeClear(pRsp->pEpset);
|
||||
|
||||
taosMemoryFree(pRsp->metaRsp.metaRsp);
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||
taosMemoryFreeClear(pRsp->pEpset);
|
||||
|
||||
taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
|
||||
taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
|
||||
taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
|
||||
|
@ -1321,7 +1304,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
pRspWrapper->vgHandle = pVg;
|
||||
pRspWrapper->topicHandle = pTopic;
|
||||
pRspWrapper->reqId = requestId;
|
||||
pRspWrapper->pEpset = pMsg->pEpSet;
|
||||
|
||||
pMsg->pEpSet = NULL;
|
||||
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
|
||||
|
@ -1350,7 +1335,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
|
||||
|
@ -1516,6 +1500,14 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
if (head->epoch <= epoch) {
|
||||
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
|
||||
tmq->consumerId, head->epoch, epoch);
|
||||
if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||
SMqAskEpRsp rsp;
|
||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||
int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
|
||||
atomic_store_8(&tmq->status, flag);
|
||||
tDeleteSMqAskEpRsp(&rsp);
|
||||
}
|
||||
|
||||
goto END;
|
||||
}
|
||||
|
||||
|
@ -1702,7 +1694,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
|
||||
for (int j = 0; j < numOfVg; j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < 100) { // less than 100ms
|
||||
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
|
||||
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch,
|
||||
pVg->vgId);
|
||||
continue;
|
||||
|
@ -1731,6 +1723,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
}
|
||||
}
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1782,31 +1775,43 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
|
||||
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
|
||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
|
||||
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
|
||||
|
||||
if (pDataRsp->head.epoch == consumerEpoch) {
|
||||
// todo fix it: race condition
|
||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
|
||||
|
||||
// update the epset
|
||||
if (pollRspWrapper->pEpset != NULL) {
|
||||
SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
|
||||
SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
|
||||
tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId,
|
||||
pVg->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
|
||||
pVg->epSet = *pollRspWrapper->pEpset;
|
||||
}
|
||||
|
||||
pVg->currentOffset = pDataRsp->rspOffset;
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
|
||||
if (pollRspWrapper->dataRsp.blockNum == 0) {
|
||||
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
|
||||
pollRspWrapper->reqId);
|
||||
char buf[80];
|
||||
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
|
||||
if (pDataRsp->blockNum == 0) {
|
||||
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, reqId:0x%" PRIx64, tmq->consumerId,
|
||||
pVg->vgId, buf, pollRspWrapper->reqId);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
rspWrapper = NULL;
|
||||
continue;
|
||||
}
|
||||
|
||||
// build rsp
|
||||
char buf[80];
|
||||
tFormatOffset(buf, 80, &pVg->currentOffset);
|
||||
} else { // build rsp
|
||||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
|
||||
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId,
|
||||
pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, pollRspWrapper->reqId);
|
||||
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%" PRIx64,
|
||||
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, pollRspWrapper->reqId);
|
||||
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
return pRsp;
|
||||
}
|
||||
} else {
|
||||
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||
tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
|
||||
tmq->consumerId, pDataRsp->head.epoch, consumerEpoch);
|
||||
tmqFreeRspWrapper(rspWrapper);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ void mndRebCntDec() {
|
|||
int32_t newVal = val - 1;
|
||||
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
|
||||
if (oldVal == val) {
|
||||
mInfo("rebalance trans end, rebalance counter:%d", newVal);
|
||||
mDebug("rebalance trans end, rebalance counter:%d", newVal);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -510,7 +510,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TMQ_CONSUME) &&
|
||||
!syncIsReadyForRead(pVnode->sync)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue