Merge pull request #14174 from taosdata/feature/stream

fix(tmq): read repeat data after reading snapshot
This commit is contained in:
Liu Jicong 2022-06-23 17:10:44 +08:00 committed by GitHub
commit fe167cd396
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 4 additions and 2 deletions

View File

@ -252,15 +252,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
#if 1 #if 1
if (pReq->useSnapshot) { if (pReq->useSnapshot) {
tqInfo("retrieve using snapshot"); // TODO set ver into snapshot
int64_t lastVer = walGetCommittedVer(pTq->pWal); int64_t lastVer = walGetCommittedVer(pTq->pWal);
if (rsp.reqOffset < lastVer) { if (rsp.reqOffset < lastVer) {
tqInfo("retrieve using snapshot req offset %ld last ver %ld", rsp.reqOffset, lastVer);
tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId); tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId);
if (rsp.blockNum != 0) { if (rsp.blockNum != 0) {
rsp.withTbName = false; rsp.withTbName = false;
rsp.rspOffset = lastVer; rsp.rspOffset = lastVer;
tqInfo("direct send by snapshot rsp offset %ld", lastVer); tqInfo("direct send by snapshot req offset %ld rsp offset %ld", rsp.reqOffset, rsp.rspOffset);
fetchOffset = lastVer;
goto SEND_RSP; goto SEND_RSP;
} }
} }