From 4000176dc736b8d004bb6dc60fcb3a77fdc70aed Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 9 Nov 2022 14:44:04 +0800 Subject: [PATCH] enh(tmq): auto clear lost consumer --- include/common/tmsg.h | 2 +- include/common/tmsgdef.h | 3 +- source/dnode/mnode/impl/inc/mndConsumer.h | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 69 +++++++++++++++++++++-- 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c2cd7de139..643588de19 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1802,7 +1802,7 @@ int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicR typedef struct { int64_t consumerId; -} SMqConsumerLostMsg, SMqConsumerRecoverMsg; +} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg; typedef struct { int64_t consumerId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 9868fc49bc..a12a635837 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -149,7 +149,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "mq-tmr", SMTimerReq, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "tmq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL) @@ -171,6 +171,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 210e336ac2..1176e1af0b 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -44,6 +44,7 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); +int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); bool mndRebTryStart(); void mndRebEnd(); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index df999316eb..62ad5bae15 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -32,7 +32,8 @@ #define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_RESERVE_SIZE 64 -#define MND_CONSUMER_LOST_HB_CNT 3 +#define MND_CONSUMER_LOST_HB_CNT 3 +#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200 static int8_t mqRebInExecCnt = 0; @@ -50,6 +51,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg); static int32_t mndProcessMqHbReq(SRpcMsg *pMsg); static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg); +static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); int32_t mndInitConsumer(SMnode *pMnode) { @@ -69,6 +71,7 @@ int32_t mndInitConsumer(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer); @@ -162,6 +165,43 @@ FAIL: return -1; } +static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; + SMqConsumerClearMsg *pClearMsg = pMsg->pCont; + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); + if (pConsumer == NULL) { + return 0; + } + + mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId, + mndConsumerStatusName(pConsumer->status)); + + if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { + mndReleaseConsumer(pMnode, pConsumer); + return -1; + } + + SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); + pConsumerNew->updateType = CONSUMER_UPDATE__LOST; + + mndReleaseConsumer(pMnode, pConsumer); + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); + if (pTrans == NULL) goto FAIL; + if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; + if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; + + tDeleteSMqConsumerObj(pConsumerNew); + taosMemoryFree(pConsumerNew); + mndTransDrop(pTrans); + return 0; +FAIL: + tDeleteSMqConsumerObj(pConsumerNew); + taosMemoryFree(pConsumerNew); + mndTransDrop(pTrans); + return -1; +} + static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1); if (pRebInfo == NULL) { @@ -206,15 +246,28 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); pLostMsg->consumerId = pConsumer->consumerId; - SRpcMsg pRpcMsg = { + SRpcMsg rpcMsg = { .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg), }; - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } - if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) { + + if (status == MQ_CONSUMER_STATUS__READY) { // do nothing + } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) { + if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { + SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); + + pClearMsg->consumerId = pConsumer->consumerId; + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, + .pCont = pClearMsg, + .contLen = sizeof(SMqConsumerClearMsg), + }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } } else if (status == MQ_CONSUMER_STATUS__LOST) { taosRLockLatch(&pConsumer->lock); int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); @@ -444,6 +497,14 @@ FAIL: return -1; } +int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) { + SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) { SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); if (pCommitRaw == NULL) return -1;