diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4f02c559b1..5dd349f4ab 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -895,6 +895,9 @@ int64_t getSmaWaterMark(int64_t interval, double filesFactor); bool isSmaStream(int8_t triggerType); int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); +int32_t finalizeResultRowIntoSDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, + SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, + SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7b73fd8ae9..5bd9044167 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1955,6 +1955,51 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_ } } +int32_t finalizeResultRowIntoSDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, + SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, + SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { + SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); + SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset); + + doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset); + if (pRow->numOfRows == 0) { + releaseBufPage(pBuf, page); + return 0; + } + + if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { + releaseBufPage(pBuf, page); + return -1; + } + + for (int32_t j = 0; j < numOfExprs; ++j) { + int32_t slotId = pExprInfo[j].base.resSchema.slotId; + + pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); + if (pCtx[j].fpSet.finalize) { + int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (TAOS_FAILED(code)) { + qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); + longjmp(pTaskInfo->env, code); + } + } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { + // do nothing, todo refactor + } else { + // expand the result into multiple rows. E.g., _wstartts, top(k, 20) + // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + for (int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); + } + } + } + + releaseBufPage(pBuf, page); + + return 0; +} + int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b309478556..696b9139cf 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3175,3 +3175,229 @@ _error: pTaskInfo->code = code; return NULL; } + +typedef struct SMergeIntervalAggOperatorInfo { + SIntervalAggOperatorInfo intervalAggOperatorInfo; + + SHashObj* groupIntervalHash; +} SMergeIntervalAggOperatorInfo; + +void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { + SMergeIntervalAggOperatorInfo* pInfo = (SMergeIntervalAggOperatorInfo*)param; + taosHashCleanup(pInfo->groupIntervalHash); + destroyIntervalOperatorInfo(&pInfo->intervalAggOperatorInfo, numOfOutput); +} + +static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, + int32_t scanFlag, SSDataBlock* pResultBlock) { + SMergeIntervalAggOperatorInfo *miaInfo = pOperatorInfo->info; + SIntervalAggOperatorInfo * pInfo = &miaInfo->intervalAggOperatorInfo; + + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + + int32_t startPos = 0; + int32_t numOfOutput = pOperatorInfo->numOfExprs; + int64_t* tsCols = extractTsCol(pBlock, pInfo); + uint64_t tableGroupId = pBlock->info.groupId; + bool ascScan = (pInfo->order == TSDB_ORDER_ASC); + TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); + SResultRow* pResult = NULL; + + STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &pInfo->interval, + pInfo->interval.precision, &pInfo->win); + //TODO: pResultBlock full + //TODO: pBlock not process not finished + //TODO: different block group id or no group id + //TODO: lastWin may be none, p1 shall not be null + //TODO: the last datablock + //TODO: blockDataUpdateTsWindow(pBlock, 0); + + int32_t ret = + setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + TSKEY ekey = ascScan ? win.ekey : win.skey; + int32_t forwardRows = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + ASSERT(forwardRows > 0); + + // prev time window not interpolation yet. + if (pInfo->timeWindowInterpo) { + SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); + doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); + + // restore current time window + ret = + setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + // window start key interpolation + doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &win, startPos, forwardRows); + } + + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, + pBlock->info.rows, numOfOutput, pInfo->order); + + doCloseWindow(pResultRowInfo, pInfo, pResult); + STimeWindow *lastWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); + if (ascScan && win.skey > lastWin->ekey || (!ascScan) && win.skey < lastWin->ekey) { + SET_RES_WINDOW_KEY(pInfo->aggSup.keyBuf, &lastWin->skey, TSDB_KEYSIZE, tableGroupId); + SResultRowPosition* p1 = + (SResultRowPosition*)taosHashGet(pInfo->aggSup.pResultRowHashTable, pInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + finalizeResultRowIntoSDataBlock(pInfo->aggSup.pResultBuf, p1, + pInfo->binfo.pCtx, pOperatorInfo->pExpr, pOperatorInfo->numOfExprs, pInfo->binfo.rowCellInfoOffset, + pResultBlock, pTaskInfo); + taosHashRemove(pInfo->aggSup.pResultRowHashTable, pInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + + taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), &win, sizeof(STimeWindow)); + } + + STimeWindow nextWin = win; + while (1) { + int32_t prevEndPos = forwardRows - 1 + startPos; + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order); + if (startPos < 0) { + break; + } + + // null data, failed to allocate more memory buffer + int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, + pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, + &pInfo->aggSup, pTaskInfo); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + ekey = ascScan ? nextWin.ekey : nextWin.skey; + forwardRows = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + + // window start(end) key interpolation + doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, + forwardRows); + + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + tsCols, pBlock->info.rows, numOfOutput, pInfo->order); + doCloseWindow(pResultRowInfo, pInfo, pResult); + } + + if (pInfo->timeWindowInterpo) { + saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols); + } +} + +static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info; + SIntervalAggOperatorInfo *pInfo = &miaInfo->intervalAggOperatorInfo; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SSDataBlock* pRes = pInfo->binfo.pRes; + blockDataCleanup(pRes); + + int32_t scanFlag = MAIN_SCAN; + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + break; + } + + getTableScanInfo(pOperator, &pInfo->order, &scanFlag); + + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true); + STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; + + setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); + doMergeIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); + } + + if (pRes->info.rows == 0) { + doSetOperatorCompleted(pOperator); + } + + size_t rows = pRes->info.rows; + pOperator->resultInfo.totalRows += rows; + return (rows == 0) ? NULL : pRes; +} + +SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { + SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (miaInfo == NULL || pOperator == NULL) { + goto _error; + } + SIntervalAggOperatorInfo *pInfo = &miaInfo->intervalAggOperatorInfo; + + pInfo->win = pTaskInfo->window; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; + pInfo->execModel = pTaskInfo->execModel; + pInfo->twAggSup = *pTwAggSupp; + + pInfo->primaryTsIndex = primaryTsSlotId; + miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); + + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + initResultSizeInfo(pOperator, 4096); + + int32_t code = + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); + + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); + + pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, pInfo); + if (pInfo->timeWindowInterpo) { + pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + } + + // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { + goto _error; + } + + initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); + + pOperator->name = "TimeMergeIntervalAggOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = pInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, doStreamIntervalAgg, NULL, + destroyIntervalOperatorInfo, NULL, NULL, NULL); + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + destroyMergeIntervalOperatorInfo(pInfo, numOfCols); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +}