From c124bcf52fca3a9528ba93d51d7563564a083525 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 27 Dec 2023 11:29:03 +0800 Subject: [PATCH 1/3] check block rows --- source/libs/executor/src/streamtimewindowoperator.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index c9490e2c55..b9f3dc19d8 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2343,10 +2343,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa } int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow); - if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - ASSERT(pBlock->info.rows > 0); - break; - } if (code == -1) { // for history @@ -2363,6 +2359,11 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa continue; } + if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { + ASSERT(pBlock->info.rows > 0); + break; + } + pGroupResInfo->index += 1; for (int32_t j = 0; j < numOfExprs; ++j) { @@ -2383,6 +2384,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); for (int32_t k = 0; k < pRow->numOfRows; ++k) { + qDebug("===stream===block rows:%d, rows:%d, exprs:%d, capacity:%d", pBlock->info.rows, i, j, pBlock->info.capacity); colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); } } From f1304680c590473a3c9c24eaaa582be9119b60f4 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 27 Dec 2023 14:01:38 +0800 Subject: [PATCH 2/3] log --- source/libs/executor/src/streamtimewindowoperator.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index b9f3dc19d8..024a44d107 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2384,7 +2384,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); for (int32_t k = 0; k < pRow->numOfRows; ++k) { - qDebug("===stream===block rows:%d, rows:%d, exprs:%d, capacity:%d", pBlock->info.rows, i, j, pBlock->info.capacity); colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); } } From 448b7c939f122dd1f25d11a4ae59a4cef5b79ed0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 28 Dec 2023 11:09:38 +0800 Subject: [PATCH 3/3] reset datablock window info --- source/libs/executor/src/scanoperator.c | 1 + .../executor/src/streamtimewindowoperator.c | 27 ++++++++++--------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ef2a99d1d1..b38b9bca49 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2316,6 +2316,7 @@ FETCH_NEXT_BLOCK: doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 024a44d107..c749c3814d 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -801,10 +801,24 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat SRowBuffPos* pResPos = NULL; SResultRow* pResult = NULL; int32_t forwardRows = 0; + int32_t endRowId = pSDataBlock->info.rows - 1; SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; + if (pSDataBlock->info.window.skey != tsCols[0] || pSDataBlock->info.window.ekey != tsCols[endRowId]) { + qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64 + ",maxKey %" PRId64, + pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); + blockDataUpdateTsWindow(pSDataBlock, pInfo->primaryTsIndex); + + // timestamp of the data is incorrect + if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) { + qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64, + pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); + } + } + int32_t startPos = 0; TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); STimeWindow nextWin = {0}; @@ -902,19 +916,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat pInfo->delKey = key; } int32_t prevEndPos = (forwardRows - 1) * step + startPos; - if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) { - qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64 - ",maxKey %" PRId64, - pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); - blockDataUpdateTsWindow(pSDataBlock, 0); - - // timestamp of the data is incorrect - if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) { - qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64, - pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); - } - } - if (IS_FINAL_INTERVAL_OP(pOperator)) { startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); } else {