feat:[TS-4592]remove lost status for consumer
This commit is contained in:
parent
9c2dae3613
commit
75efea5551
|
@ -901,6 +901,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
// status
|
// status
|
||||||
const char *pStatusName = mndConsumerStatusName(pConsumer->status);
|
const char *pStatusName = mndConsumerStatusName(pConsumer->status);
|
||||||
status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
|
status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
MND_TMQ_NULL_CHECK(status);
|
||||||
STR_TO_VARSTR(status, pStatusName);
|
STR_TO_VARSTR(status, pStatusName);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
@ -940,6 +941,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
|
||||||
|
|
||||||
parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
|
parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
MND_TMQ_NULL_CHECK(parasStr);
|
||||||
(void)sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
|
(void)sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
|
||||||
pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
|
pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
|
||||||
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
|
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
|
||||||
|
|
|
@ -396,7 +396,6 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
|
||||||
|
|
||||||
END:
|
END:
|
||||||
taosMemoryFree(pVgEp);
|
taosMemoryFree(pVgEp);
|
||||||
sdbRelease(pMnode->pSdb, pVgroup);
|
|
||||||
taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
|
taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -773,7 +772,7 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
|
||||||
|
|
||||||
if (status == MQ_CONSUMER_STATUS_READY) {
|
if (status == MQ_CONSUMER_STATUS_READY) {
|
||||||
if (taosArrayGetSize(pConsumer->currentTopics) == 0) { // unsubscribe or close
|
if (taosArrayGetSize(pConsumer->currentTopics) == 0) { // unsubscribe or close
|
||||||
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info);
|
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
|
||||||
} else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
|
} else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
|
||||||
pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
|
pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
|
@ -788,7 +787,7 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
|
||||||
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
|
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
|
||||||
taosRUnLockLatch(&pConsumer->lock);
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
} else {
|
} else {
|
||||||
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info);
|
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
Loading…
Reference in New Issue