fix:test case error for reset=latest

This commit is contained in:
wangmm0220 2023-10-08 10:55:35 +08:00
parent 9c470e9297
commit fb3197ea89
11 changed files with 54 additions and 34 deletions

View File

@ -108,12 +108,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
if (pRequest->useSnapshot) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
consumerId, pHandle->subKey, vgId);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){
tqError("tmq poll column can not use snapshot");
terrno = TSDB_CODE_TQ_INVALID_CONFIG;
return -1;
}
if (pHandle->fetchMeta) {
tqOffsetResetToMeta(pOffsetVal, 0);
} else {

View File

@ -1831,30 +1831,56 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
return NULL;
}
ASSERT(pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG);
while (1) {
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
return pResult;
}
SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
// curVersion move to next
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion);
if (hasResult) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes);
STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX};
setBlockIntoRes(pInfo, pRes, &defaultWindow, true);
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
}
} else {
qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
pTSInfo->base.dataReader = NULL;
int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1;
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer);
if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) {
return NULL;
}
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer);
}
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
while (1) {
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);
SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
// curVersion move to next
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion);
if (hasResult) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes);
STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX};
setBlockIntoRes(pInfo, pRes, &defaultWindow, true);
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
}
} else {
qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
return NULL;
}
}
} else {
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
return NULL;
}
}
}
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {

View File

@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList