fix:opti poll callbak logic & send epoch as -1 to get epset if vnode transformed

This commit is contained in:
wangmm0220 2023-09-13 18:32:45 +08:00
parent 1dbd322fa0
commit f729150497
3 changed files with 70 additions and 95 deletions

View File

@ -135,13 +135,11 @@ static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
}
enum {
TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_DATA_RSP,
TMQ_MSG_TYPE__POLL_DATA_RSP = 0,
TMQ_MSG_TYPE__POLL_META_RSP,
TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__POLL_DATA_META_RSP,
TMQ_MSG_TYPE__WALINFO_RSP,
TMQ_MSG_TYPE__END_RSP,
};
enum {

View File

@ -40,11 +40,13 @@ volatile int32_t tmqInitRes = 0; // initialize rsp code
static struct SMqMgmt tmqMgmt = {0};
typedef struct {
int32_t code;
int8_t tmqRspType;
int32_t epoch;
} SMqRspWrapper;
typedef struct {
int32_t code;
int8_t tmqRspType;
int32_t epoch;
SMqAskEpRsp msg;
@ -131,7 +133,6 @@ enum {
enum {
TMQ_DELAYED_TASK__ASK_EP = 1,
TMQ_DELAYED_TASK__REPORT,
TMQ_DELAYED_TASK__COMMIT,
};
@ -163,6 +164,7 @@ typedef struct {
} SMqClientTopic;
typedef struct {
int32_t code;
int8_t tmqRspType;
int32_t epoch; // epoch can be used to guard the vgHandle
int32_t vgId;
@ -255,7 +257,7 @@ static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet);
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void askEp(tmq_t* pTmq, void* param, bool sync);
static void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset);
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
@ -845,7 +847,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
while (pTaskType != NULL) {
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
askEp(pTmq, NULL, false);
askEp(pTmq, NULL, false, false);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
@ -862,7 +864,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
pTmq->autoCommitInterval / 1000.0);
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
} else {
tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
}
taosFreeQitem(pTaskType);
@ -874,9 +877,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
}
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
// do nothing
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
@ -1300,33 +1301,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
uint64_t requestId = pParam->requestId;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) {
taosMemoryFree(pParam);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
goto FAIL;
}
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if(code != 0){
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER",
tmq->consumerId);
} else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64,
tmq->consumerId, vgId, requestId);
goto END;
}
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
taosWriteQitem(tmq->mqueue, pRspWrapper);
} else{
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s, reqId:0x%" PRIx64, tmq->consumerId,
vgId, tstrerror(code), requestId);
}
goto END;
}
@ -1342,23 +1329,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}
ASSERT(msgEpoch == clientEpoch);
// handle meta rsp
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
pRspWrapper->tmqRspType = rspType;
pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet;
pMsg->pEpSet = NULL;
pRspWrapper->vgId = vgId;
strcpy(pRspWrapper->topicName, pParam->topicName);
if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SDecoder decoder;
@ -1387,22 +1363,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
}
END:
pRspWrapper->code = code;
pRspWrapper->vgId = vgId;
strcpy(pRspWrapper->topicName, pParam->topicName);
taosWriteQitem(tmq->mqueue, pRspWrapper);
int32_t total = taosQueueItemSize(tmq->mqueue);
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
tmq->consumerId, rspType, vgId, total, requestId);
END:
if(code != 0){
setVgIdle(tmq, pParam->topicName, vgId);
}
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
FAIL:
tsem_post(&tmq->rspSem);
taosMemoryFree(pParam);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
if(pMsg) taosMemoryFreeClear(pMsg->pData);
if(pMsg) taosMemoryFreeClear(pMsg->pEpSet);
return code;
}
@ -1473,7 +1449,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
if (epoch <= tmq->epoch) {
if (topicNumGet <= 0) {
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet);
return false;
}
@ -1675,10 +1653,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
if(code != 0){
goto FAIL;
}
@ -1688,11 +1665,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pTmq->pollCnt++;
return 0;
FAIL:
taosMemoryFreeClear(pParam);
taosMemoryFreeClear(msg);
return code;
return tmqPollCb(pParam, NULL, code);
}
// broadcast the poll request to all related vnodes
@ -1729,8 +1704,6 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
atomic_store_32(&pVg->vgSkipCnt, 0);
code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
goto end;
}
}
@ -1742,22 +1715,6 @@ end:
return code;
}
static int32_t tmqHandleEpRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
tDeleteSMqAskEpRsp(rspMsg);
} else {
tmqFreeRspWrapper(rspWrapper);
}
} else {
return -1;
}
return 0;
}
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
@ -1792,11 +1749,28 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
if (pRspWrapper->code != 0) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", tmq->consumerId);
} else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
taosFreeQitem(pRspWrapper);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
terrno = pRspWrapper->code;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(pRspWrapper->code));
return NULL;
} else{
if(pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID){ // for vnode transform
askEp(tmq, NULL, false, true);
}
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code));
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
taosWUnLockLatch(&tmq->lock);
}
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
taosFreeQitem(pRspWrapper);
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
@ -1834,6 +1808,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock);
} else { // build rsp
int64_t numOfRows = 0;
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
@ -1847,7 +1822,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
taosWUnLockLatch(&tmq->lock);
return pRsp;
}
taosWUnLockLatch(&tmq->lock);
} else {
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
@ -2379,13 +2353,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch);
if (head->epoch <= epoch) {
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
tmq->consumerId, head->epoch, epoch);
goto END;
}
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId,
head->epoch, epoch);
if(pParam->sync){
SMqAskEpRsp rsp = {0};
@ -2427,7 +2395,7 @@ int32_t syncAskEp(tmq_t* pTmq) {
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
tsem_init(&pInfo->sem, 0, 0);
askEp(pTmq, pInfo, true);
askEp(pTmq, pInfo, true, false);
tsem_wait(&pInfo->sem);
int32_t code = pInfo->code;
@ -2436,10 +2404,10 @@ int32_t syncAskEp(tmq_t* pTmq) {
return code;
}
void askEp(tmq_t* pTmq, void* param, bool sync) {
void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
SMqAskEpReq req = {0};
req.consumerId = pTmq->consumerId;
req.epoch = pTmq->epoch;
req.epoch = updateEpSet ? -1 :pTmq->epoch;
strcpy(req.cgroup, pTmq->groupId);
int code = 0;
SMqAskEpCbParam* pParam = NULL;

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "mndConsumer.h"
#include "mndPrivilege.h"
#include "mndVgroup.h"
#include "mndShow.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
@ -542,6 +543,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
char offsetKey[TSDB_PARTITION_KEY_LEN];
mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
if(epoch == -1){
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
if(pVgroup){
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
mndReleaseVgroup(pMnode, pVgroup);
}
}
// 2.2.1 build vg ep
SMqSubVgEp vgEp = {
.epSet = pVgEp->epSet,