From 432c943c256150fae6442f373d4e02747ef8d0af Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 26 Jul 2023 16:34:39 +0800 Subject: [PATCH] fix(stream): do filter before the update check. --- source/dnode/vnode/src/tq/tq.c | 10 ++++++---- source/libs/executor/src/scanoperator.c | 7 +++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a935eaf5f7..cd9a17081b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1322,13 +1322,15 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { return -1; } - tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x", pTask->id.idStr, req.downstreamId); - int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); if (remain > 0) { - tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain); + tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, remain:%d not send finish rsp", + pTask->id.idStr, req.downstreamId, remain); } else { - tqDebug("s-task:%s all downstream tasks rsp scan-history completed msg", pTask->id.idStr); + tqDebug( + "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " + "completed msg", + pTask->id.idStr); streamProcessScanHistoryFinishRsp(pTask); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c8d9ed59f..a15b128a99 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2155,15 +2155,14 @@ FETCH_NEXT_BLOCK: return pInfo->pCreateTbRes; } - doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); - // apply additional time window filter doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); - pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); + doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) {