From 432c943c256150fae6442f373d4e02747ef8d0af Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 26 Jul 2023 16:34:39 +0800 Subject: [PATCH 1/3] fix(stream): do filter before the update check. --- source/dnode/vnode/src/tq/tq.c | 10 ++++++---- source/libs/executor/src/scanoperator.c | 7 +++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a935eaf5f7..cd9a17081b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1322,13 +1322,15 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { return -1; } - tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x", pTask->id.idStr, req.downstreamId); - int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); if (remain > 0) { - tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain); + tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, remain:%d not send finish rsp", + pTask->id.idStr, req.downstreamId, remain); } else { - tqDebug("s-task:%s all downstream tasks rsp scan-history completed msg", pTask->id.idStr); + tqDebug( + "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " + "completed msg", + pTask->id.idStr); streamProcessScanHistoryFinishRsp(pTask); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c8d9ed59f..a15b128a99 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2155,15 +2155,14 @@ FETCH_NEXT_BLOCK: return pInfo->pCreateTbRes; } - doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); - // apply additional time window filter doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); - pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); + doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) { From 3c387f6e91ed6581fd52c0453752e3e3e5941c0f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 26 Jul 2023 17:00:41 +0800 Subject: [PATCH 2/3] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tq.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index cd9a17081b..3bcc141edc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1329,8 +1329,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { } else { tqDebug( "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " - "completed msg", - pTask->id.idStr); + "completed msg", pTask->id.idStr, req.downstreamId); streamProcessScanHistoryFinishRsp(pTask); } From 6a6830761de08b54c09701395e3abb3599f86980 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 26 Jul 2023 17:08:29 +0800 Subject: [PATCH 3/3] fix(tsdb): check for null ptr of pScanInfo --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e96406567a..08cc81b004 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2564,9 +2564,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { // load the last data block of current table STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; - if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { - // reset the index in last block when handing a new file - // doCleanupTableScanInfo(pScanInfo); + if (pScanInfo == NULL) { + tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr); bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -2575,8 +2574,15 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { continue; } - // reset the index in last block when handing a new file - // doCleanupTableScanInfo(pScanInfo); + if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { + // reset the index in last block when handing a new file + bool hasNexTable = moveToNextTable(pUidList, pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + + continue; + } bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasDataInLastFile) {