From 62780bbfb4cbd4432e49b42e646435562db6f242 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 10 Jun 2022 10:24:52 +0800 Subject: [PATCH] feature: add merge interval operator --- source/libs/executor/src/timewindowoperator.c | 45 +++++++++---------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3ed416ec7c..842e46a8cd 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3236,10 +3236,6 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->interval.precision, &iaInfo->win); - //TODO: pBlock not process not finished - //TODO: different block group id or no group id - //TODO: the last datablock - //TODO: blockDataUpdateTsWindow(pBlock, 0); int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx, @@ -3370,7 +3366,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { if (pRes->info.rows == 0) { doSetOperatorCompleted(pOperator); } else { - //TODO: ts column index blockDataUpdateTsWindow(pRes, 0); } @@ -3387,36 +3382,38 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI if (miaInfo == NULL || pOperator == NULL) { goto _error; } - SIntervalAggOperatorInfo *pInfo = &miaInfo->intervalAggOperatorInfo; - pInfo->win = pTaskInfo->window; - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; - pInfo->execModel = pTaskInfo->execModel; - pInfo->twAggSup = *pTwAggSupp; + SIntervalAggOperatorInfo * iaInfo = &miaInfo->intervalAggOperatorInfo; - pInfo->primaryTsIndex = primaryTsSlotId; + iaInfo->win = pTaskInfo->window; + iaInfo->order = TSDB_ORDER_ASC; + iaInfo->interval = *pInterval; + + iaInfo->execModel = pTaskInfo->execModel; + iaInfo->twAggSup = *pTwAggSupp; + + iaInfo->primaryTsIndex = primaryTsSlotId; miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096); int32_t code = - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); + initAggInfo(&iaInfo->binfo, &iaInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); + initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); - pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, pInfo); - if (pInfo->timeWindowInterpo) { - pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + iaInfo->timeWindowInterpo = timeWindowinterpNeeded(iaInfo->binfo.pCtx, numOfCols, iaInfo); + if (iaInfo->timeWindowInterpo) { + iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); } - // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); - if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { + // iaInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS /* || iaInfo->pTableQueryInfo == NULL*/) { goto _error; } - initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); + initResultRowInfo(&iaInfo->binfo.resultRowInfo, (int32_t)1); pOperator->name = "TimeMergeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; @@ -3425,10 +3422,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; + pOperator->info = miaInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, - destroyIntervalOperatorInfo, NULL, NULL, NULL); + destroyMergeIntervalOperatorInfo, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -3438,8 +3435,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI return pOperator; _error: - destroyMergeIntervalOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); + destroyMergeIntervalOperatorInfo(miaInfo, numOfCols); + taosMemoryFreeClear(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL;