Merge pull request #21190 from taosdata/fix/TS-3347

fix:[TS-3347]set ver to first version if version stored is smaller th…
This commit is contained in:
Haojun Liao 2023-05-06 18:51:49 +08:00 committed by GitHub
commit c6007d53e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 6 deletions

View File

@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
void qStreamSetOpen(qTaskInfo_t tinfo);

View File

@ -246,6 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (offset->type == TMQ_OFFSET__LOG) {
verifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {

View File

@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
pOperator->status = OP_NOT_OPENED;
}
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
// if offset version is small than first version , let's seek to first version
int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
@ -1084,12 +1092,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tsdbReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL;
// let's seek to the next version in wal file
int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
verifyOffset(pInfo->tqReader->pWalReader, pOffset);
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
return -1;