From 4bc94300a50943da2e783fab6ca484c9e9022d33 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 9 Oct 2022 21:49:40 +0800 Subject: [PATCH] enh(query): support scalar expressions in the state window aggregate. --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/tfill.c | 9 --- source/libs/executor/src/timewindowoperator.c | 80 +++++++++++-------- 3 files changed, 49 insertions(+), 41 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 897015c4d3..77fa33fb0e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -815,6 +815,7 @@ typedef struct SStateWindowOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; + SExprSupp scalarSup; SGroupResInfo groupResInfo; SWindowRowsSup winSup; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 34997a5dd8..cf0bc078d7 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -366,7 +366,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t } // set the tag value for final result - // setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent); SInterval* pInterval = &pFillInfo->interval; pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision); @@ -523,14 +522,6 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { return NULL; } -void taosFillSetDataOrderInfo(SFillInfo* pFillInfo, int32_t order) { - if (pFillInfo == NULL || (order != TSDB_ORDER_ASC && order != TSDB_ORDER_DESC)) { - return; - } - - pFillInfo->order = order; -} - void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) { if (pFillInfo->type == TSDB_FILL_NONE) { return; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8df6f15a1b..84ea68f3b7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1213,39 +1213,17 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI pBlock->info.rows, numOfOutput); } -static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; +static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) { + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; } SStateWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExprSupp* pSup = &pOperator->exprSupp; - - SOptrBasicInfo* pBInfo = &pInfo->binfo; - - if (pOperator->status == OP_RES_TO_RETURN) { - while (1) { - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - - bool hasRemain = hasRemainResults(&pInfo->groupResInfo); - if (!hasRemain) { - doSetOperatorCompleted(pOperator); - break; - } - - if (pBInfo->pRes->info.rows > 0) { - break; - } - } - pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; - return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; - } - - int32_t order = TSDB_ORDER_ASC; - int64_t st = taosGetTimestampUs(); + SExprSupp* pSup = &pOperator->exprSupp; + int32_t order = TSDB_ORDER_ASC; + int64_t st = taosGetTimestampUs(); SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -1257,13 +1235,40 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true); blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); + // there is an scalar expression that needs to be calculated right before apply the group aggregation. + if (pInfo->scalarSup.pExprInfo != NULL) { + pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, + pInfo->scalarSup.numOfExprs, NULL); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + } + doStateWindowAggImpl(pOperator, pInfo, pBlock); } pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); pOperator->status = OP_RES_TO_RETURN; - initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SStateWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + doSetOperatorCompleted(pOperator); + return NULL; + } + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); @@ -1279,6 +1284,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { break; } } + pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; } @@ -1659,6 +1665,7 @@ static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->stateKey.pData); + cleanupExprSupp(&pInfo->scalarSup); taosMemoryFreeClear(param); } @@ -2690,6 +2697,15 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; + if (pStateNode->window.pExprs != NULL) { + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); + int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + pInfo->stateCol = extractColumnFromColumnNode(pColNode); pInfo->stateKey.type = pInfo->stateCol.type; pInfo->stateKey.bytes = pInfo->stateCol.bytes; @@ -2712,7 +2728,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; - ; + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->tsSlotId = tsSlotId; @@ -2723,7 +2739,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, + pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1);