fix:[TD-30323]tmq close error if consumer is cleared
This commit is contained in:
parent
d7ecd98cf9
commit
1a6a0d3107
|
@ -1260,7 +1260,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
int32_t retryCnt = 0;
|
||||
while ((code = syncAskEp(tmq)) != 0) {
|
||||
if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
|
||||
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
|
||||
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tmq->consumerId, strerror(code));
|
||||
if(code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
|
||||
code = 0;
|
||||
}
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
|
|
@ -662,6 +662,10 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
if(mndAcquireConsumer(pMnode, subscribe->consumerId) == NULL && taosArrayGetSize(subscribe->topicNames) == 0){
|
||||
goto _over;
|
||||
}
|
||||
|
||||
code = checkAndSortTopic(pMnode, subscribe.topicNames);
|
||||
if(code != TSDB_CODE_SUCCESS){
|
||||
goto _over;
|
||||
|
|
Loading…
Reference in New Issue