fix(tmq): read repeat data after reading snapshot

This commit is contained in:
Liu Jicong 2022-06-23 17:07:11 +08:00
parent c018a14e56
commit 14048a5e4c
1 changed files with 4 additions and 2 deletions

View File

@ -252,15 +252,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
#if 1 #if 1
if (pReq->useSnapshot) { if (pReq->useSnapshot) {
tqInfo("retrieve using snapshot"); // TODO set ver into snapshot
int64_t lastVer = walGetCommittedVer(pTq->pWal); int64_t lastVer = walGetCommittedVer(pTq->pWal);
if (rsp.reqOffset < lastVer) { if (rsp.reqOffset < lastVer) {
tqInfo("retrieve using snapshot req offset %ld last ver %ld", rsp.reqOffset, lastVer);
tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId); tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId);
if (rsp.blockNum != 0) { if (rsp.blockNum != 0) {
rsp.withTbName = false; rsp.withTbName = false;
rsp.rspOffset = lastVer; rsp.rspOffset = lastVer;
tqInfo("direct send by snapshot rsp offset %ld", lastVer); tqInfo("direct send by snapshot req offset %ld rsp offset %ld", rsp.reqOffset, rsp.rspOffset);
fetchOffset = lastVer;
goto SEND_RSP; goto SEND_RSP;
} }
} }