diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 0a1f5d51d4..9c90c6b5c0 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -135,6 +135,7 @@ typedef struct { // int8_t scanUncommited; int8_t scanNotApplied; int8_t scanMeta; + int8_t deleteMsg; int8_t enableRef; } SWalFilterCond; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index be0dc1ec45..81d64fb98a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -648,7 +648,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + SWalFilterCond cond = {.deleteMsg = 1}; + pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } streamSetupTrigger(pTask); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 800bcc8b71..e4b2fa8821 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -82,6 +82,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; qStreamSetOpen(task); + tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) { tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr()); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index bfb2865444..d63208522f 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -87,8 +87,9 @@ int32_t walNextValidMsg(SWalReader *pReader) { return -1; } - if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT || pReader->pHead->head.msgType == TDMT_VND_DELETE || - (IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) { + int32_t type = pReader->pHead->head.msgType; + if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || + (IS_META_MSG(type) && pReader->cond.scanMeta)) { if (walFetchBodyNew(pReader) < 0) { return -1; }