Merge pull request #25966 from taosdata/fix/TD-30323-3.0

fix:[TD-30323]tmq close error if consumer is cleared
This commit is contained in:
dapan1121 2024-06-03 19:02:59 +08:00 committed by GitHub
commit 8a4070f06c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 12 additions and 1 deletions

View File

@ -1280,7 +1280,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0;
while ((code = syncAskEp(tmq)) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tmq->consumerId, strerror(code));
if(code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
code = 0;
}
goto FAIL;
}

View File

@ -662,6 +662,14 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL;
if(taosArrayGetSize(subscribe.topicNames) == 0){
SMqConsumerObj *pConsumerTmp = mndAcquireConsumer(pMnode, subscribe.consumerId);
if(pConsumerTmp == NULL){
goto _over;
}
mndReleaseConsumer(pMnode, pConsumerTmp);
}
code = checkAndSortTopic(pMnode, subscribe.topicNames);
if(code != TSDB_CODE_SUCCESS){
goto _over;