diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 336053911e..947ab2e7ff 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -69,8 +69,10 @@ typedef struct SIOCostSummary { double buildmemBlock; int64_t headFileLoad; double headFileLoadTime; - int64_t smaData; + int64_t smaDataLoad; double smaLoadTime; + int64_t lastBlockLoad; + double lastBlockLoadTime; } SIOCostSummary; typedef struct SBlockLoadSuppInfo { @@ -98,10 +100,10 @@ typedef struct SLastBlockReader { } SLastBlockReader; typedef struct SFilesetIter { - int32_t numOfFiles; // number of total files - int32_t index; // current accessed index in the list - SArray* pFileList; // data file list - int32_t order; + int32_t numOfFiles; // number of total files + int32_t index; // current accessed index in the list + SArray* pFileList; // data file list + int32_t order; SLastBlockReader* pLastBlockReader; // last file block reader } SFilesetIter; @@ -857,7 +859,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) { int64_t st = taosGetTimestampUs(); - double elapsedTime = 0; + double elapsedTime = 0; int32_t code = 0; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); @@ -1303,9 +1305,23 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey); } - return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) || - keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || - overlapWithDel || overlapWithlastBlock); + bool moreThanOutputCapacity = pBlock->nRow > pReader->capacity; + bool partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock); + bool overlapWithKey = keyOverlapFileBlock(key, pBlock, &pReader->verRange); + + bool loadDataBlock = (overlapWithNeighbor || hasDup || partiallyRequired || overlapWithKey || + moreThanOutputCapacity || overlapWithDel || overlapWithlastBlock); + + // log the reason why load the datablock for profile + if (loadDataBlock) { + tsdbDebug("%p uid:%" PRIu64 + " need to load the datablock, reason overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, " + "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s", + pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey, + moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr); + } + + return loadDataBlock; } static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) { @@ -1992,7 +2008,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); // no last block - if (pLastBlockReader->lastBlockData.nRow == 0) { + if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) { if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { return TSDB_CODE_SUCCESS; } else { @@ -2383,7 +2399,6 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { return TSDB_CODE_SUCCESS; } -// todo add elapsed time results static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader) { SArray* pBlocks = pLastBlockReader->pBlockL; SBlockL* pBlock = NULL; @@ -2415,6 +2430,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable return TSDB_CODE_SUCCESS; } + int64_t st = taosGetTimestampUs(); int32_t code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr); @@ -2422,17 +2438,23 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable } code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData); + + double el = (taosGetTimestampUs() - st) / 1000.0; if (code != TSDB_CODE_SUCCESS) { tsdbError("%p error occurs in loading last block into buffer, last block index:%d, total:%d code:%s %s", pReader, pLastBlockReader->currentBlockIndex, totalLastBlocks, tstrerror(code), pReader->idStr); } else { tsdbDebug("%p load last block completed, uid:%" PRIu64 - " last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 " - %" PRId64 " %s", + " last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 " - %" PRId64 + " elapsed time:%.2f ms, %s", pReader, uid, pLastBlockReader->currentBlockIndex, totalLastBlocks, pBlock->nRow, pBlock->minVer, - pBlock->maxVer, pBlock->minKey, pBlock->maxKey, pReader->idStr); + pBlock->maxVer, pBlock->minKey, pBlock->maxKey, el, pReader->idStr); } pLastBlockReader->currentBlockIndex = index; + pReader->cost.lastBlockLoad += 1; + pReader->cost.lastBlockLoadTime += el; + return TSDB_CODE_SUCCESS; } @@ -2495,13 +2517,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } static int32_t doBuildDataBlock(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - - SReaderStatus* pStatus = &pReader->status; - SDataBlockIter* pBlockIter = &pStatus->blockIter; - TSDBKEY key = {0}; + int32_t code = TSDB_CODE_SUCCESS; SBlock* pBlock = NULL; + + SReaderStatus* pStatus = &pReader->status; + SDataBlockIter* pBlockIter = &pStatus->blockIter; STableBlockScanInfo* pScanInfo = NULL; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; @@ -2628,7 +2649,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl // initialize the block iterator for a new fileset if (num.numOfBlocks > 0) { code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks); - } else { + } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); } @@ -2701,7 +2722,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { if (hasNext) { // check for the next block in the block accessed order list initBlockDumpInfo(pReader, pBlockIter); } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now - // todo dump all data in last block if exists. tBlockDataReset(&pReader->status.fileBlockData); resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); goto _begin; @@ -3498,10 +3518,11 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 " SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-time:%.2f ms, " - "build in-memory-block-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s", - pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime, - pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, - numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr); + "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 + ", lastBlocks-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s", + pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, + pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, + pCost->lastBlockLoadTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr); taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->pSchema); @@ -3663,7 +3684,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS double elapsed = (taosGetTimestampUs() - stime) / 1000.0; pReader->cost.smaLoadTime += elapsed; - pReader->cost.smaData += 1; + pReader->cost.smaDataLoad += 1; *pBlockStatis = pSup->plist; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5e339eb113..fb4eac991f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -860,8 +860,8 @@ int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDa bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, - int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, + int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, char** pNextStart); void updateLoadRemoteInfo(SLoadRemoteDataInfo *pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f7fb6cd405..16a1cb898f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -378,15 +378,30 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); } -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, - SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, - int32_t numOfTotal, int32_t numOfOutput, int32_t order) { +typedef struct { + bool hasAgg; + int32_t numOfRows; + int32_t startOffset; +} SFunctionCtxStatus; + +static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { + pStatus->hasAgg = pCtx->input.colDataAggIsSet; + pStatus->numOfRows = pCtx->input.numOfRows; + pStatus->startOffset = pCtx->input.startRowIndex; +} + +static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { + pCtx->input.colDataAggIsSet = pStatus->hasAgg; + pCtx->input.numOfRows = pStatus->numOfRows; + pCtx->input.startRowIndex = pStatus->startOffset; +} + +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, + int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { for (int32_t k = 0; k < numOfOutput; ++k) { // keep it temporarily - // todo no need this?? - bool hasAgg = pCtx[k].input.colDataAggIsSet; - int32_t numOfRows = pCtx[k].input.numOfRows; - int32_t startOffset = pCtx[k].input.startRowIndex; + SFunctionCtxStatus status = {0}; + functionCtxSave(&pCtx[k], &status); pCtx[k].input.startRowIndex = offset; pCtx[k].input.numOfRows = forwardStep; @@ -424,9 +439,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow } // restore it - pCtx[k].input.colDataAggIsSet = hasAgg; - pCtx[k].input.startRowIndex = startOffset; - pCtx[k].input.numOfRows = numOfRows; + functionCtxRestore(&pCtx[k], &status); } } } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 507719e0aa..05dffc658b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -277,7 +277,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = j - num; - doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); @@ -295,7 +295,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); } } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9b9a38c7ea..9eaab69633 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -641,8 +641,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); - doApplyFunctions(pTaskInfo, pSup->pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, pBlock->info.rows, - numOfExprs, pInfo->inputOrder); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { closeResultRow(pr); @@ -986,8 +985,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, pInfo->inputOrder); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + pBlock->info.rows, numOfOutput); } doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -1026,8 +1025,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, pInfo->inputOrder); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, pInfo, pResult); } @@ -1190,8 +1189,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); @@ -1215,8 +1214,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { @@ -1934,8 +1933,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // pInfo->numOfRows data belong to the current session window updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); @@ -1952,8 +1951,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { @@ -2952,8 +2951,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + pSDataBlock->info.rows, numOfOutput); int32_t prevEndPos = (forwardRows - 1) * step + startPos; ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); @@ -3776,8 +3775,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre return TSDB_CODE_QRY_OUT_OF_MEMORY; } updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pCurWin->win, pTimeWindowData, startIndex, winRows, tsCols, - pSDataBlock->info.rows, numOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, pSDataBlock->info.rows, numOutput); SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pCurWin->pos.pageId); setBufPageDirty(bufPage, true); releaseBufPage(pAggSup->pResultBuf, bufPage); @@ -4938,8 +4936,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); + doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, + pBlock->info.rows, numOfOutput); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); miaInfo->curTs = tsCols[currPos]; @@ -4960,8 +4958,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); + doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, + pBlock->info.rows, numOfOutput); } static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { @@ -5253,8 +5251,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pExprSup->pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, iaInfo->inputOrder); + doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, iaInfo, pResult); // output previous interval results after this interval (&win) is closed @@ -5285,8 +5283,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pExprSup->pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); + doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, iaInfo, pResult); // output previous interval results after this interval (&nextWin) is closed