fix(tmq): set version after consuming from tsdb
This commit is contained in:
parent
6ec12c19e8
commit
d8d32ebc51
|
@ -68,7 +68,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
||||||
pRsp->rspOffset = *pOffset;
|
pRsp->rspOffset = *pOffset;
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1);
|
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
||||||
if (qStreamPrepareScan(task, pOffset) < 0) {
|
if (qStreamPrepareScan(task, pOffset) < 0) {
|
||||||
pRsp->rspOffset = *pOffset;
|
pRsp->rspOffset = *pOffset;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -106,7 +106,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1);
|
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
||||||
qStreamPrepareScan(task, pOffset);
|
qStreamPrepareScan(task, pOffset);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -320,7 +320,7 @@ class TDTestCase:
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
|
|
||||||
self.tmqCase1()
|
self.tmqCase1()
|
||||||
# self.tmqCase2()
|
self.tmqCase2()
|
||||||
self.tmqCase3()
|
self.tmqCase3()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
Loading…
Reference in New Issue