From fa154172b40786106b2cb1aad61c0d9c4149ac1d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 6 Apr 2023 18:15:03 +0800 Subject: [PATCH 1/2] fix:time_wait in doAskEp --- source/client/src/clientTmq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e295ec93af..0e0cfaa94d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1529,8 +1529,8 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { } else { tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, head->epoch, epoch); - pParam->pUserFn(tmq, code, pMsg, pParam->pParam); } + pParam->pUserFn(tmq, code, pMsg, pParam->pParam); taosReleaseRef(tmqMgmt.rsetId, pParam->refId); From 7a58f448a5011200a9b9e43305a522882445695f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 6 Apr 2023 19:16:31 +0800 Subject: [PATCH 2/2] fix:time_wait in doAskEp --- source/client/src/clientTmq.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0e0cfaa94d..befcb00ac7 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1426,6 +1426,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); + if (epoch <= tmq->epoch) { + return false; + } SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); if (newTopics == NULL) {