diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4b8fd3d116..18091f0a4f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -319,9 +319,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // 3.query if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - if (fetchOffsetNew.type == TMQ_OFFSET__LOG) { - fetchOffsetNew.version++; - } + /*if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {*/ + /*fetchOffsetNew.version++;*/ + /*}*/ if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) { ASSERT(0); code = -1; diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index ae5499af11..0bdbe82b77 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -66,7 +66,6 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa if (qStreamPrepareScan(task, pOffset) < 0) { ASSERT(pOffset->type == TMQ_OFFSET__LOG); pRsp->rspOffset = *pOffset; - pRsp->rspOffset.version--; return 0; } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 76d4e35c33..351846c560 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -288,14 +288,22 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { while (1) { uint8_t type = pOperator->operatorType; - pOperator->status = OP_OPENED; + /*pOperator->status = OP_OPENED;*/ if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pInfo = pOperator->info; if (pOffset->type == TMQ_OFFSET__LOG) { - if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) { +#if 0 + if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && + pInfo->tqReader->pWalReader->curVersion != pOffset->version) { + qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version, + pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version); + ASSERT(0); + } +#endif + if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { return -1; } - ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version); + ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1); } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ int64_t uid = pOffset->uid;