enh(tmq): auto clear lost consumer
This commit is contained in:
parent
118e5bf5d3
commit
4000176dc7
|
@ -1802,7 +1802,7 @@ int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicR
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
} SMqConsumerLostMsg, SMqConsumerRecoverMsg;
|
} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
|
|
|
@ -149,7 +149,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "mq-tmr", SMTimerReq, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "tmq-tmr", SMTimerReq, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
|
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
|
||||||
|
@ -171,6 +171,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||||
|
|
|
@ -44,6 +44,7 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
|
||||||
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||||
|
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||||
|
|
||||||
bool mndRebTryStart();
|
bool mndRebTryStart();
|
||||||
void mndRebEnd();
|
void mndRebEnd();
|
||||||
|
|
|
@ -32,7 +32,8 @@
|
||||||
#define MND_CONSUMER_VER_NUMBER 1
|
#define MND_CONSUMER_VER_NUMBER 1
|
||||||
#define MND_CONSUMER_RESERVE_SIZE 64
|
#define MND_CONSUMER_RESERVE_SIZE 64
|
||||||
|
|
||||||
#define MND_CONSUMER_LOST_HB_CNT 3
|
#define MND_CONSUMER_LOST_HB_CNT 3
|
||||||
|
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
|
||||||
|
|
||||||
static int8_t mqRebInExecCnt = 0;
|
static int8_t mqRebInExecCnt = 0;
|
||||||
|
|
||||||
|
@ -50,6 +51,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
|
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
|
||||||
|
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
||||||
|
|
||||||
int32_t mndInitConsumer(SMnode *pMnode) {
|
int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
|
@ -69,6 +71,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
|
||||||
|
@ -162,6 +165,43 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
|
||||||
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
|
||||||
|
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
|
||||||
|
if (pConsumer == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId,
|
||||||
|
mndConsumerStatusName(pConsumer->status));
|
||||||
|
|
||||||
|
if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
||||||
|
pConsumerNew->updateType = CONSUMER_UPDATE__LOST;
|
||||||
|
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
|
||||||
|
if (pTrans == NULL) goto FAIL;
|
||||||
|
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||||
|
|
||||||
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
|
taosMemoryFree(pConsumerNew);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
FAIL:
|
||||||
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
|
taosMemoryFree(pConsumerNew);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
||||||
SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
||||||
if (pRebInfo == NULL) {
|
if (pRebInfo == NULL) {
|
||||||
|
@ -206,15 +246,28 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
|
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
|
||||||
|
|
||||||
pLostMsg->consumerId = pConsumer->consumerId;
|
pLostMsg->consumerId = pConsumer->consumerId;
|
||||||
SRpcMsg pRpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.msgType = TDMT_MND_TMQ_CONSUMER_LOST,
|
.msgType = TDMT_MND_TMQ_CONSUMER_LOST,
|
||||||
.pCont = pLostMsg,
|
.pCont = pLostMsg,
|
||||||
.contLen = sizeof(SMqConsumerLostMsg),
|
.contLen = sizeof(SMqConsumerLostMsg),
|
||||||
};
|
};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) {
|
|
||||||
|
if (status == MQ_CONSUMER_STATUS__READY) {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||||
|
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
|
||||||
|
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
|
||||||
|
|
||||||
|
pClearMsg->consumerId = pConsumer->consumerId;
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
|
||||||
|
.pCont = pClearMsg,
|
||||||
|
.contLen = sizeof(SMqConsumerClearMsg),
|
||||||
|
};
|
||||||
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
|
}
|
||||||
} else if (status == MQ_CONSUMER_STATUS__LOST) {
|
} else if (status == MQ_CONSUMER_STATUS__LOST) {
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
|
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
|
||||||
|
@ -444,6 +497,14 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
|
||||||
|
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
|
||||||
|
if (pCommitRaw == NULL) return -1;
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
|
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
|
||||||
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
|
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
|
||||||
if (pCommitRaw == NULL) return -1;
|
if (pCommitRaw == NULL) return -1;
|
||||||
|
|
Loading…
Reference in New Issue