diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 828681e2cf..d122a9b0b5 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -108,12 +108,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand if (pRequest->useSnapshot) { tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot", consumerId, pHandle->subKey, vgId); - - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ - tqError("tmq poll column can not use snapshot"); - terrno = TSDB_CODE_TQ_INVALID_CONFIG; - return -1; - } if (pHandle->fetchMeta) { tqOffsetResetToMeta(pOffsetVal, 0); } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 297c0f8aff..3e085d71de 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1831,30 +1831,56 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return NULL; } - ASSERT(pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG); - while (1) { - bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id); + if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { + SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + if (pResult && pResult->info.rows > 0) { + tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey); + return pResult; + } - SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); - struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); - // curVersion move to next - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion); - - if (hasResult) { - qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, - pTaskInfo->streamInfo.currentOffset.version); - blockDataCleanup(pInfo->pRes); - STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; - setBlockIntoRes(pInfo, pRes, &defaultWindow, true); - if (pInfo->pRes->info.rows > 0) { - return pInfo->pRes; - } - } else { - qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); + pTSInfo->base.dataReader = NULL; + int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1; + qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer); + if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) { return NULL; } + + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer); } + + if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { + + while (1) { + bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id); + + SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); + struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); + + // curVersion move to next + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion); + + if (hasResult) { + qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, + pTaskInfo->streamInfo.currentOffset.version); + blockDataCleanup(pInfo->pRes); + STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; + setBlockIntoRes(pInfo, pRes, &defaultWindow, true); + if (pInfo->pRes->info.rows > 0) { + return pInfo->pRes; + } + } else { + qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); + return NULL; + } + } + } else { + qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type); + return NULL; + } +} } static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) { diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index fe6ec04a20..80e048a9e4 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic1Of2Cons.sim b/tests/script/tsim/tmq/basic1Of2Cons.sim index c12351cbe8..76e9775fc9 100644 --- a/tests/script/tsim/tmq/basic1Of2Cons.sim +++ b/tests/script/tsim/tmq/basic1Of2Cons.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic2.sim b/tests/script/tsim/tmq/basic2.sim index 5c7528ea5d..57bcb40921 100644 --- a/tests/script/tsim/tmq/basic2.sim +++ b/tests/script/tsim/tmq/basic2.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic2Of2Cons.sim b/tests/script/tsim/tmq/basic2Of2Cons.sim index 23598c17a4..44b6a4b591 100644 --- a/tests/script/tsim/tmq/basic2Of2Cons.sim +++ b/tests/script/tsim/tmq/basic2Of2Cons.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim index 1223a94fa7..b279a13826 100644 --- a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim +++ b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic3.sim b/tests/script/tsim/tmq/basic3.sim index 8bb34cefa2..b66917623e 100644 --- a/tests/script/tsim/tmq/basic3.sim +++ b/tests/script/tsim/tmq/basic3.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic3Of2Cons.sim b/tests/script/tsim/tmq/basic3Of2Cons.sim index 75d762c44b..6284356e91 100644 --- a/tests/script/tsim/tmq/basic3Of2Cons.sim +++ b/tests/script/tsim/tmq/basic3Of2Cons.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic4.sim b/tests/script/tsim/tmq/basic4.sim index c72d8ff412..290d611cbc 100644 --- a/tests/script/tsim/tmq/basic4.sim +++ b/tests/script/tsim/tmq/basic4.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic4Of2Cons.sim b/tests/script/tsim/tmq/basic4Of2Cons.sim index bb006a354c..0a3b4d2ee0 100644 --- a/tests/script/tsim/tmq/basic4Of2Cons.sim +++ b/tests/script/tsim/tmq/basic4Of2Cons.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList