Merge pull request #15085 from taosdata/feature/stream
fix(tmq): set version after consuming from tsdb
This commit is contained in:
commit
244cbf2d1f
|
@ -68,7 +68,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
|||
pRsp->rspOffset = *pOffset;
|
||||
return 0;
|
||||
} else {
|
||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1);
|
||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
||||
if (qStreamPrepareScan(task, pOffset) < 0) {
|
||||
pRsp->rspOffset = *pOffset;
|
||||
return 0;
|
||||
|
@ -106,7 +106,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
|||
}
|
||||
|
||||
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1);
|
||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
||||
qStreamPrepareScan(task, pOffset);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -320,7 +320,7 @@ class TDTestCase:
|
|||
tdSql.prepare()
|
||||
|
||||
self.tmqCase1()
|
||||
# self.tmqCase2()
|
||||
self.tmqCase2()
|
||||
self.tmqCase3()
|
||||
|
||||
def stop(self):
|
||||
|
|
Loading…
Reference in New Issue