diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 54e929c9a4..9e60f8b04d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1363,7 +1363,6 @@ CREATE_MSG_FAIL: typedef struct SVgroupSaveInfo { STqOffsetVal offset; int64_t numOfRows; - int32_t vgStatus; } SVgroupSaveInfo; static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap, @@ -1399,7 +1398,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .currentOffset = offsetNew, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, - .vgStatus = pInfo != NULL ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, + .vgStatus = TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, .numOfRows = numOfRows, @@ -1458,7 +1457,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows, .vgStatus = pVgCur->vgStatus}; + SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0080feadbe..7539761f4e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -368,6 +368,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } taosRUnLockLatch(&pTq->lock); + // 3. update the epoch value + taosWLockLatch(&pTq->lock); + int32_t savedEpoch = pHandle->epoch; + if (savedEpoch < reqEpoch) { + tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, + reqEpoch); + pHandle->epoch = reqEpoch; + } + taosWUnLockLatch(&pTq->lock); + char buf[80]; tFormatOffset(buf, 80, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,