From e3e42cdfbdfdc19c948524e696d944ddcd6e6598 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Dec 2022 00:16:40 +0800 Subject: [PATCH] fix(query): set update ts flag for stream. --- source/common/src/tdatablock.c | 4 ++++ source/dnode/vnode/src/tq/tqRead.c | 1 + source/libs/executor/src/exchangeoperator.c | 1 + source/libs/executor/src/executorimpl.c | 2 ++ source/libs/executor/src/scanoperator.c | 5 +++++ 5 files changed, 13 insertions(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0e89ecdd68..aeea9b2681 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -358,6 +358,10 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSiz size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) { + if (pDataBlock->info.rows > 0) { + ASSERT(pDataBlock->info.dataLoad == 1); + } + if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) { return 0; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index c3a4cefc66..46b31bc5b0 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -533,6 +533,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { pBlock->info.id.uid = pReader->msgIter.uid; pBlock->info.rows = pReader->msgIter.numOfRows; pBlock->info.version = pReader->pMsg->version; + pBlock->info.dataLoad = 1; while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) { tdSTSRowIterReset(&iter, row); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 8423b77906..4103ca82dc 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -510,6 +510,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo blockDataEnsureCapacity(pRes, pBlock->info.rows); // data from mnode + pRes->info.dataLoad = 1; pRes->info.rows = pBlock->info.rows; relocateColumnData(pRes, pColList, pBlock->pDataBlock, false); blockDataDestroy(pBlock); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 41e3d890cd..1c80eff685 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2546,6 +2546,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat pBlock->info.rows += pRow->numOfRows; releaseOutputBuf(pState, &key, pRow); } + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; } @@ -2635,6 +2636,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta } } + pBlock->info.dataLoad = 1; pBlock->info.rows += pRow->numOfRows; // saveSessionDiscBuf(pState, pKey, pVal, size); releaseOutputBuf(pState, NULL, pRow); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d5239b340c..018e969b35 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1313,6 +1313,7 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, } pDestBlock->info.type = STREAM_CLEAR; pDestBlock->info.version = pSrcBlock->info.version; + pDestBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pDestBlock, 0); return code; } @@ -1421,6 +1422,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock } if (out && pInfo->pUpdateDataRes->info.rows > 0) { pInfo->pUpdateDataRes->info.version = pBlock->info.version; + pInfo->pUpdateDataRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0); pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR; } @@ -1483,6 +1485,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); } + pInfo->pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataFreeRes((SSDataBlock*)pBlock); @@ -1771,6 +1774,7 @@ FETCH_NEXT_BLOCK: // TODO move into scan pBlock->info.calWin.skey = INT64_MIN; pBlock->info.calWin.ekey = INT64_MAX; + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { case STREAM_NORMAL: @@ -1948,6 +1952,7 @@ FETCH_NEXT_BLOCK: } doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + pInfo->pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {