diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 8038273b43..fde1b12be0 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1280,7 +1280,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t retryCnt = 0; while ((code = syncAskEp(tmq)) != 0) { if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { - tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId); + tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tmq->consumerId, strerror(code)); + if(code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { + code = 0; + } goto FAIL; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index c7ae941389..3eef2afcc1 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -662,6 +662,14 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumerNew = NULL; STrans *pTrans = NULL; + if(taosArrayGetSize(subscribe.topicNames) == 0){ + SMqConsumerObj *pConsumerTmp = mndAcquireConsumer(pMnode, subscribe.consumerId); + if(pConsumerTmp == NULL){ + goto _over; + } + mndReleaseConsumer(pMnode, pConsumerTmp); + } + code = checkAndSortTopic(pMnode, subscribe.topicNames); if(code != TSDB_CODE_SUCCESS){ goto _over;