From bafa54778a7173473c2e784896d10a3b91b0c87d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 10 Jul 2022 14:10:39 +0800 Subject: [PATCH] refactor(stream) --- source/dnode/vnode/src/tq/tq.c | 4 +++- source/libs/executor/src/scanoperator.c | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2aef9e7175..e0afc6c80f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -318,7 +318,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // 3.query if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - fetchOffsetNew.version++; + if (fetchOffsetNew.type == TMQ_OFFSET__LOG) { + fetchOffsetNew.version++; + } if (tqScan(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) { ASSERT(0); code = -1; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 57e74415a7..3532c1d367 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -427,8 +427,11 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; // todo refactor - pTableScanInfo->lastStatus.uid = pBlock->info.uid; - pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey; + /*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/ + /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/ + pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; + pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid; + pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; ASSERT(pBlock->info.uid != 0); return pBlock;