feat:[TD-30270]opti consumer status in client

This commit is contained in:
wangmm0220 2024-09-03 23:24:39 +08:00
parent a72dac975c
commit c532e9367d
2 changed files with 46 additions and 73 deletions

View File

@ -135,8 +135,6 @@ enum {
enum { enum {
TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__INIT = 0,
TMQ_CONSUMER_STATUS__READY, TMQ_CONSUMER_STATUS__READY,
TMQ_CONSUMER_STATUS__NO_TOPIC,
TMQ_CONSUMER_STATUS__RECOVER,
TMQ_CONSUMER_STATUS__CLOSED, TMQ_CONSUMER_STATUS__CLOSED,
}; };
@ -1453,9 +1451,12 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
taosMsleep(SUBSCRIBE_RETRY_INTERVAL); taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
} }
tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); if (tmq->epTimer == NULL){
tmq->commitTimer = tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); }
if (tmq->commitTimer == NULL){
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
}
if (tmq->epTimer == NULL || tmq->commitTimer == NULL) { if (tmq->epTimer == NULL || tmq->commitTimer == NULL) {
code = TSDB_CODE_TSC_INTERNAL_ERROR; code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto END; goto END;
@ -1463,7 +1464,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
END: END:
taosArrayDestroyP(req.topicNames, taosMemoryFree); taosArrayDestroyP(req.topicNames, taosMemoryFree);
return code; return code;
} }
@ -1723,28 +1723,25 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
} }
} }
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) { if (epoch <= tmq->epoch ) {
tscDebug("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId, tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
tmq->epoch, epoch, topicNumGet); tmq->epoch, epoch, topicNumGet);
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { return;
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
}
return false;
} }
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) { if (newTopics == NULL) {
return false; tscError("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
return;
} }
SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
if (pVgOffsetHashMap == NULL) { if (pVgOffsetHashMap == NULL) {
(void)taosArrayDestroy(newTopics); (void)taosArrayDestroy(newTopics);
return false; tscError("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
return;
} }
taosWLockLatch(&tmq->lock); taosWLockLatch(&tmq->lock);
@ -1805,12 +1802,10 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
tmq->clientTopics = newTopics; tmq->clientTopics = newTopics;
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY; atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
atomic_store_8(&tmq->status, flag);
atomic_store_32(&tmq->epoch, epoch); atomic_store_32(&tmq->epoch, epoch);
tscInfo("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); tscInfo("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
return set;
} }
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
@ -2051,9 +2046,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
// broadcast the poll request to all related vnodes // broadcast the poll request to all related vnodes
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
return 0;
}
int32_t code = 0; int32_t code = 0;
taosWLockLatch(&tmq->lock); taosWLockLatch(&tmq->lock);
@ -2150,26 +2142,26 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
if (pRspWrapper->code != 0) { if (pRspWrapper->code != 0) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform
tscDebug("consumer:0x%" PRIx64 " wait for the rebalance, set status to be RECOVER", tmq->consumerId); int32_t code = askEp(tmq, NULL, false, true);
} else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { if (code != 0) {
tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
} else { }
if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
int32_t code = askEp(tmq, NULL, false, true); int32_t code = askEp(tmq, NULL, false, false);
if (code != 0) { if (code != 0) {
tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
}
} }
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
tstrerror(pRspWrapper->code));
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
if (pVg) pVg->emptyBlockReceiveTs = taosGetTimestampMs();
taosWUnLockLatch(&tmq->lock);
} }
tscInfo("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
tstrerror(pRspWrapper->code));
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
if (pVg) pVg->emptyBlockReceiveTs = taosGetTimestampMs();
taosWUnLockLatch(&tmq->lock);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
taosMemoryFreeClear(pollRspWrapper->pEpset); taosMemoryFreeClear(pollRspWrapper->pEpset);
tmqFreeRspWrapper(pRspWrapper); tmqFreeRspWrapper(pRspWrapper);
@ -2388,7 +2380,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tscDebug("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)pRspWrapper; SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)pRspWrapper;
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
(void)doUpdateLocalEp(tmq, pEpRspWrapper->epoch, rspMsg); doUpdateLocalEp(tmq, pEpRspWrapper->epoch, rspMsg);
tmqFreeRspWrapper(pRspWrapper); tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper); taosFreeQitem(pRspWrapper);
} else { } else {
@ -2413,23 +2405,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
return NULL; return NULL;
} }
while (1) {
if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__RECOVER) {
break;
}
tscInfo("consumer:0x%" PRIx64 " tmq status is recover", tmq->consumerId);
int32_t retryCnt = 0;
while (syncAskEp(tmq) != 0) {
if (retryCnt++ > 40) {
return NULL;
}
tscInfo("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
taosMsleep(500);
}
}
(void)atomic_val_compare_exchange_8(&pollFlag, 0, 1); (void)atomic_val_compare_exchange_8(&pollFlag, 0, 1);
while (1) { while (1) {
@ -2482,11 +2457,14 @@ static void displayConsumeStatistics(tmq_t* pTmq) {
tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
} }
static int32_t innerClose(tmq_t* tmq) { int32_t tmq_unsubscribe(tmq_t* tmq) {
if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
int32_t code = 0; int32_t code = 0;
int8_t status = atomic_load_8(&tmq->status); int8_t status = atomic_load_8(&tmq->status);
if (status == TMQ_CONSUMER_STATUS__CLOSED || status != TMQ_CONSUMER_STATUS__READY) { tscInfo("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
tscInfo("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, close it directly", tmq->consumerId, status);
if (status != TMQ_CONSUMER_STATUS__READY) {
tscInfo("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status);
goto END; goto END;
} }
if (tmq->autoCommit) { if (tmq->autoCommit) {
@ -2507,23 +2485,18 @@ static int32_t innerClose(tmq_t* tmq) {
if(code != 0){ if(code != 0){
goto END; goto END;
} }
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
END: END:
return code; return code;
} }
int32_t tmq_unsubscribe(tmq_t* tmq) {
if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
tscInfo("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, tmq->status);
return innerClose(tmq);
}
int32_t tmq_consumer_close(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) {
if (tmq == NULL) return TSDB_CODE_INVALID_PARA; if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
displayConsumeStatistics(tmq); displayConsumeStatistics(tmq);
int32_t code = innerClose(tmq); int32_t code = tmq_unsubscribe(tmq);
if (code == 0) { if (code == 0) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
(void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId); (void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
} }
return code; return code;
@ -2862,7 +2835,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pParam->sync) { if (pParam->sync) {
SMqAskEpRsp rsp = {0}; SMqAskEpRsp rsp = {0};
if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) { if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) {
(void)doUpdateLocalEp(tmq, head->epoch, &rsp); doUpdateLocalEp(tmq, head->epoch, &rsp);
} }
tDeleteSMqAskEpRsp(&rsp); tDeleteSMqAskEpRsp(&rsp);
} else { } else {

View File

@ -577,15 +577,15 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SCMSubscribeReq subscribe = {0}; SCMSubscribeReq subscribe = {0};
MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen)); MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
bool ubSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0); bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
if(ubSubscribe){ if(unSubscribe){
SMqConsumerObj *pConsumerTmp = NULL; SMqConsumerObj *pConsumerTmp = NULL;
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp)); MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
mndReleaseConsumer(pMnode, pConsumerTmp); mndReleaseConsumer(pMnode, pConsumerTmp);
} }
MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames)); MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
(ubSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE), (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
pMsg, "subscribe"); pMsg, "subscribe");
MND_TMQ_NULL_CHECK(pTrans); MND_TMQ_NULL_CHECK(pTrans);