diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c310ca4d40..0c5ccab6a0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -571,10 +571,26 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { if (reqOffset.type == TMQ_OFFSET__LOG) { dataRsp.rspOffset.version = reqOffset.version; - } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { - dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position - } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - dataRsp.rspOffset.version = ever; + } else if(reqOffset.type < 0){ + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); + if (pOffset != NULL) { + if (pOffset->val.type != TMQ_OFFSET__LOG) { + tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey); + terrno = TSDB_CODE_INVALID_PARA; + tDeleteMqDataRsp(&dataRsp); + return -1; + } + + dataRsp.rspOffset.version = pOffset->val.version; + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); + }else{ + if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { + dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position + } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { + dataRsp.rspOffset.version = ever; + } + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); + } } else { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, reqOffset.type); @@ -582,7 +598,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { tDeleteMqDataRsp(&dataRsp); return -1; } - tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); tDeleteMqDataRsp(&dataRsp);