From 1fc664693e9f956565e5872d77c8074fd4d09d1b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 28 Jun 2023 11:54:48 +0800 Subject: [PATCH] fix:continue consume if wal is dropped --- source/dnode/vnode/src/tq/tqRead.c | 10 +++++----- source/libs/executor/src/executor.c | 10 ++++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 77a966715e..5f53f1c50c 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -388,7 +388,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) { int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < numOfBlocks) { - tqDebug("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk, + tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk, numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); @@ -403,7 +403,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) { void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); if (ret != NULL) { - tqDebug("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver); + tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver); SSDataBlock* pRes = NULL; int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL); @@ -412,11 +412,11 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) { } } else { pReader->nextBlk += 1; - tqDebug("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid); + tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid); } } - qDebug("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id); + qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id); tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); pReader->msg.msgStr = NULL; @@ -604,7 +604,7 @@ static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SCol } int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) { - tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); + tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++); SSDataBlock* pBlock = pReader->pResBlock; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c8b66836d5..88e2165a12 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1078,6 +1078,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SOperatorInfo* pOperator = pTaskInfo->pRoot; const char* id = GET_TASKID(pTaskInfo); + if(subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG){ + pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id); + if (pOperator == NULL) { + return -1; + } + SStreamScanInfo* pInfo = pOperator->info; + SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn; + SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader); + walReaderVerifyOffset(pWalReader, pOffset); + } // if pOffset equal to current offset, means continue consume if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) { return 0;