From e4881423b8733775880d1c68cc9549e412c91cc6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Aug 2022 11:09:12 +0800 Subject: [PATCH] fix(query): add one more condition check when merge file block and last block. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 13 +++-- source/libs/executor/inc/executorimpl.h | 4 +- source/libs/executor/src/executorimpl.c | 33 ++++++++---- source/libs/executor/src/groupoperator.c | 4 +- source/libs/executor/src/timewindowoperator.c | 50 +++++++++---------- 5 files changed, 57 insertions(+), 47 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 336053911e..0caf10f391 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -857,7 +857,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) { int64_t st = taosGetTimestampUs(); - double elapsedTime = 0; + double elapsedTime = 0; int32_t code = 0; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); @@ -1992,7 +1992,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); // no last block - if (pLastBlockReader->lastBlockData.nRow == 0) { + if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) { if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { return TSDB_CODE_SUCCESS; } else { @@ -2495,13 +2495,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } static int32_t doBuildDataBlock(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - - SReaderStatus* pStatus = &pReader->status; - SDataBlockIter* pBlockIter = &pStatus->blockIter; - TSDBKEY key = {0}; + int32_t code = TSDB_CODE_SUCCESS; SBlock* pBlock = NULL; + + SReaderStatus* pStatus = &pReader->status; + SDataBlockIter* pBlockIter = &pStatus->blockIter; STableBlockScanInfo* pScanInfo = NULL; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5e339eb113..fb4eac991f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -860,8 +860,8 @@ int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDa bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, - int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, + int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, char** pNextStart); void updateLoadRemoteInfo(SLoadRemoteDataInfo *pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9366f014c1..82723eebf2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -378,15 +378,30 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); } -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, - SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, - int32_t numOfTotal, int32_t numOfOutput, int32_t order) { +typedef struct { + bool hasAgg; + int32_t numOfRows; + int32_t startOffset; +} SFunctionCtxStatus; + +static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { + pStatus->hasAgg = pCtx->input.colDataAggIsSet; + pStatus->numOfRows = pCtx->input.numOfRows; + pStatus->startOffset = pCtx->input.startRowIndex; +} + +static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { + pCtx->input.colDataAggIsSet = pStatus->hasAgg; + pCtx->input.numOfRows = pStatus->numOfRows; + pCtx->input.startRowIndex = pStatus->startOffset; +} + +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, + int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { for (int32_t k = 0; k < numOfOutput; ++k) { // keep it temporarily - // todo no need this?? - bool hasAgg = pCtx[k].input.colDataAggIsSet; - int32_t numOfRows = pCtx[k].input.numOfRows; - int32_t startOffset = pCtx[k].input.startRowIndex; + SFunctionCtxStatus status = {0}; + functionCtxSave(&pCtx[k], &status); pCtx[k].input.startRowIndex = offset; pCtx[k].input.numOfRows = forwardStep; @@ -424,9 +439,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow } // restore it - pCtx[k].input.colDataAggIsSet = hasAgg; - pCtx[k].input.startRowIndex = startOffset; - pCtx[k].input.numOfRows = numOfRows; + functionCtxRestore(&pCtx[k], &status); } } } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 507719e0aa..05dffc658b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -277,7 +277,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = j - num; - doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); @@ -295,7 +295,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); } } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9b9a38c7ea..9eaab69633 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -641,8 +641,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); - doApplyFunctions(pTaskInfo, pSup->pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, pBlock->info.rows, - numOfExprs, pInfo->inputOrder); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { closeResultRow(pr); @@ -986,8 +985,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, pInfo->inputOrder); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + pBlock->info.rows, numOfOutput); } doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -1026,8 +1025,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, pInfo->inputOrder); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, pInfo, pResult); } @@ -1190,8 +1189,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); @@ -1215,8 +1214,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { @@ -1934,8 +1933,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // pInfo->numOfRows data belong to the current session window updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); @@ -1952,8 +1951,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { @@ -2952,8 +2951,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + pSDataBlock->info.rows, numOfOutput); int32_t prevEndPos = (forwardRows - 1) * step + startPos; ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); @@ -3776,8 +3775,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre return TSDB_CODE_QRY_OUT_OF_MEMORY; } updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pCurWin->win, pTimeWindowData, startIndex, winRows, tsCols, - pSDataBlock->info.rows, numOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, pSDataBlock->info.rows, numOutput); SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pCurWin->pos.pageId); setBufPageDirty(bufPage, true); releaseBufPage(pAggSup->pResultBuf, bufPage); @@ -4938,8 +4936,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); + doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, + pBlock->info.rows, numOfOutput); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); miaInfo->curTs = tsCols[currPos]; @@ -4960,8 +4958,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); + doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, + pBlock->info.rows, numOfOutput); } static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { @@ -5253,8 +5251,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pExprSup->pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, iaInfo->inputOrder); + doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, iaInfo, pResult); // output previous interval results after this interval (&win) is closed @@ -5285,8 +5283,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pExprSup->pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); + doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, iaInfo, pResult); // output previous interval results after this interval (&nextWin) is closed