From c532e9367d55c0cdbb491f4d4993c23c387e1f12 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 3 Sep 2024 23:24:39 +0800 Subject: [PATCH] feat:[TD-30270]opti consumer status in client --- source/client/src/clientTmq.c | 113 ++++++++-------------- source/dnode/mnode/impl/src/mndConsumer.c | 6 +- 2 files changed, 46 insertions(+), 73 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3185094785..4367ca4124 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -135,8 +135,6 @@ enum { enum { TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__READY, - TMQ_CONSUMER_STATUS__NO_TOPIC, - TMQ_CONSUMER_STATUS__RECOVER, TMQ_CONSUMER_STATUS__CLOSED, }; @@ -1453,9 +1451,12 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { taosMsleep(SUBSCRIBE_RETRY_INTERVAL); } - tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); - tmq->commitTimer = - taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); + if (tmq->epTimer == NULL){ + tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); + } + if (tmq->commitTimer == NULL){ + tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); + } if (tmq->epTimer == NULL || tmq->commitTimer == NULL) { code = TSDB_CODE_TSC_INTERNAL_ERROR; goto END; @@ -1463,7 +1464,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { END: taosArrayDestroyP(req.topicNames, taosMemoryFree); - return code; } @@ -1723,28 +1723,25 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic } } -static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { - bool set = false; - +static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { int32_t topicNumGet = taosArrayGetSize(pRsp->topics); - if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) { - tscDebug("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId, + if (epoch <= tmq->epoch ) { + tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId, tmq->epoch, epoch, topicNumGet); - if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); - } - return false; + return; } SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); if (newTopics == NULL) { - return false; + tscError("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno); + return; } SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); if (pVgOffsetHashMap == NULL) { (void)taosArrayDestroy(newTopics); - return false; + tscError("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno); + return; } taosWLockLatch(&tmq->lock); @@ -1805,12 +1802,10 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) tmq->clientTopics = newTopics; taosWUnLockLatch(&tmq->lock); - int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY; - atomic_store_8(&tmq->status, flag); + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); atomic_store_32(&tmq->epoch, epoch); tscInfo("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); - return set; } void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { @@ -2051,9 +2046,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p // broadcast the poll request to all related vnodes static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { - if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { - return 0; - } int32_t code = 0; taosWLockLatch(&tmq->lock); @@ -2150,26 +2142,26 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pRspWrapper->code != 0) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; - if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); - tscDebug("consumer:0x%" PRIx64 " wait for the rebalance, set status to be RECOVER", tmq->consumerId); - } else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); - } else { - if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform - int32_t code = askEp(tmq, NULL, false, true); - if (code != 0) { - tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); - } + + if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform + int32_t code = askEp(tmq, NULL, false, true); + if (code != 0) { + tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); + } + } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { + int32_t code = askEp(tmq, NULL, false, false); + if (code != 0) { + tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); } - tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, - tstrerror(pRspWrapper->code)); - taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = NULL; - getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg); - if (pVg) pVg->emptyBlockReceiveTs = taosGetTimestampMs(); - taosWUnLockLatch(&tmq->lock); } + tscInfo("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, + tstrerror(pRspWrapper->code)); + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = NULL; + getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg); + if (pVg) pVg->emptyBlockReceiveTs = taosGetTimestampMs(); + taosWUnLockLatch(&tmq->lock); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); taosMemoryFreeClear(pollRspWrapper->pEpset); tmqFreeRspWrapper(pRspWrapper); @@ -2388,7 +2380,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tscDebug("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId); SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)pRspWrapper; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; - (void)doUpdateLocalEp(tmq, pEpRspWrapper->epoch, rspMsg); + doUpdateLocalEp(tmq, pEpRspWrapper->epoch, rspMsg); tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); } else { @@ -2413,23 +2405,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { return NULL; } - while (1) { - if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__RECOVER) { - break; - } - tscInfo("consumer:0x%" PRIx64 " tmq status is recover", tmq->consumerId); - - int32_t retryCnt = 0; - while (syncAskEp(tmq) != 0) { - if (retryCnt++ > 40) { - return NULL; - } - - tscInfo("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt); - taosMsleep(500); - } - } - (void)atomic_val_compare_exchange_8(&pollFlag, 0, 1); while (1) { @@ -2482,11 +2457,14 @@ static void displayConsumeStatistics(tmq_t* pTmq) { tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); } -static int32_t innerClose(tmq_t* tmq) { +int32_t tmq_unsubscribe(tmq_t* tmq) { + if (tmq == NULL) return TSDB_CODE_INVALID_PARA; int32_t code = 0; int8_t status = atomic_load_8(&tmq->status); - if (status == TMQ_CONSUMER_STATUS__CLOSED || status != TMQ_CONSUMER_STATUS__READY) { - tscInfo("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, close it directly", tmq->consumerId, status); + tscInfo("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status); + + if (status != TMQ_CONSUMER_STATUS__READY) { + tscInfo("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status); goto END; } if (tmq->autoCommit) { @@ -2507,23 +2485,18 @@ static int32_t innerClose(tmq_t* tmq) { if(code != 0){ goto END; } - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); END: return code; } -int32_t tmq_unsubscribe(tmq_t* tmq) { - if (tmq == NULL) return TSDB_CODE_INVALID_PARA; - tscInfo("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, tmq->status); - return innerClose(tmq); -} int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); displayConsumeStatistics(tmq); - int32_t code = innerClose(tmq); + int32_t code = tmq_unsubscribe(tmq); if (code == 0) { + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); (void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId); } return code; @@ -2862,7 +2835,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { if (pParam->sync) { SMqAskEpRsp rsp = {0}; if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) { - (void)doUpdateLocalEp(tmq, head->epoch, &rsp); + doUpdateLocalEp(tmq, head->epoch, &rsp); } tDeleteSMqAskEpRsp(&rsp); } else { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index fe33d666b8..57963e62fd 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -577,15 +577,15 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SCMSubscribeReq subscribe = {0}; MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen)); - bool ubSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0); - if(ubSubscribe){ + bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0); + if(unSubscribe){ SMqConsumerObj *pConsumerTmp = NULL; MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp)); mndReleaseConsumer(pMnode, pConsumerTmp); } MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames)); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, - (ubSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE), + (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE), pMsg, "subscribe"); MND_TMQ_NULL_CHECK(pTrans);