diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a70e7cd1dd..d587e201ef 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -549,7 +549,8 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo { SIntervalAggOperatorInfo *intervalAggOperatorInfo; bool hasGroupId; - uint64_t groupId; + uint64_t groupId; // current groupId + int64_t curTs; // current ts SSDataBlock* prefetchedBlock; bool inputBlocksFinished; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 084fbbea7c..d031494057 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1445,6 +1445,7 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_ } } +// todo extract method with copytoSSDataBlock int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SSDataBlock* pBlock, diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5690389302..c1e5252089 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -507,6 +507,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { colDataSetNull_f(bitmap, (*rows)); } else { memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); + ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)); } contentLen = bytes; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fb1cb8dff5..63143875a3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4589,11 +4589,10 @@ void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) { static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, TSKEY wstartTs) { SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; - SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - SExprSupp* pSup = &pOperatorInfo->exprSupp; - bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); + SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + SExprSupp* pSup = &pOperatorInfo->exprSupp; SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, @@ -4603,8 +4602,9 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs, pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); - return 0; + return TSDB_CODE_SUCCESS; } static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, @@ -4619,11 +4619,20 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR int32_t numOfOutput = pSup->numOfExprs; int64_t* tsCols = extractTsCol(pBlock, iaInfo); uint64_t tableGroupId = pBlock->info.groupId; - TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); + TSKEY currTs = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; - STimeWindow win; - win.skey = blockStartTs; + // there is an result exists + if (miaInfo->curTs != INT64_MIN) { + ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); + if (currTs != miaInfo->curTs) { + outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); + miaInfo->curTs = INT64_MIN; + } + } + + STimeWindow win = {0}; + win.skey = currTs; win.ekey = taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; @@ -4634,41 +4643,48 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - TSKEY currTs = blockStartTs; - TSKEY currPos = startPos; + miaInfo->curTs = win.skey; + int32_t currPos = startPos; + STimeWindow currWin = win; - while (1) { - ++currPos; - if (currPos >= pBlock->info.rows) { - break; - } + while (++currPos < pBlock->info.rows) { if (tsCols[currPos] == currTs) { continue; - } else { - updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); - - outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); - - currTs = tsCols[currPos]; - currWin.skey = currTs; - currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, - iaInfo->interval.precision) - - 1; - startPos = currPos; - ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, - pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } } + + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); + doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, + tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); + + outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); + miaInfo->curTs = INT64_MIN; + + currTs = tsCols[currPos]; + currWin.skey = currTs; + currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, + iaInfo->interval.precision) - 1; + + startPos = currPos; + ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + miaInfo->curTs = currWin.skey; } + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); - outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); + if (currPos >= pBlock->info.rows) { + // we need to see next block if exists + } else { + ASSERT(0); + outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); + miaInfo->curTs = INT64_MIN; + } } static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { @@ -4682,12 +4698,14 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SSDataBlock* pRes = iaInfo->binfo.pRes; + blockDataCleanup(pRes); blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); if (!miaInfo->inputBlocksFinished) { SOperatorInfo* downstream = pOperator->pDownstream[0]; int32_t scanFlag = MAIN_SCAN; + while (1) { SSDataBlock* pBlock = NULL; if (miaInfo->prefetchedBlock == NULL) { @@ -4699,6 +4717,14 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { } if (pBlock == NULL) { + // close last unfinalized time window + if (miaInfo->curTs != INT64_MIN) { + ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); + outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs); + miaInfo->curTs = INT64_MIN; + } + + doSetOperatorCompleted(pOperator); miaInfo->inputBlocksFinished = true; break; } @@ -4707,7 +4733,11 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { miaInfo->hasGroupId = true; miaInfo->groupId = pBlock->info.groupId; } else if (miaInfo->groupId != pBlock->info.groupId) { + // if there are unclosed time window, close it firstly. + ASSERT(miaInfo->curTs != INT64_MIN); + outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs); miaInfo->prefetchedBlock = pBlock; + miaInfo->curTs = INT64_MIN; break; } @@ -4722,11 +4752,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { pRes->info.groupId = miaInfo->groupId; } - miaInfo->hasGroupId = false; - if (miaInfo->inputBlocksFinished) { - doSetOperatorCompleted(pOperator); - } + miaInfo->hasGroupId = false; size_t rows = pRes->info.rows; pOperator->resultInfo.totalRows += rows; @@ -4752,6 +4779,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprSupp* pSup = &pOperator->exprSupp; miaInfo->pCondition = pCondition; + miaInfo->curTs = INT64_MIN; + iaInfo->win = pTaskInfo->window; iaInfo->inputOrder = TSDB_ORDER_ASC; iaInfo->interval = *pInterval;