diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3141e21f26..0446af3226 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1260,7 +1260,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t retryCnt = 0; while ((code = syncAskEp(tmq)) != 0) { if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { - tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId); + tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tmq->consumerId, strerror(code)); + if(code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { + code = 0; + } goto FAIL; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index c7ae941389..9278fbce11 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -662,6 +662,10 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumerNew = NULL; STrans *pTrans = NULL; + if(mndAcquireConsumer(pMnode, subscribe->consumerId) == NULL && taosArrayGetSize(subscribe->topicNames) == 0){ + goto _over; + } + code = checkAndSortTopic(pMnode, subscribe.topicNames); if(code != TSDB_CODE_SUCCESS){ goto _over;