diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c new file mode 100644 index 0000000000..e38070953a --- /dev/null +++ b/source/libs/executor/src/eventwindowoperator.c @@ -0,0 +1,295 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executorimpl.h" +#include "filter.h" +#include "function.h" +#include "functionMgt.h" +#include "tcommon.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "tfill.h" +#include "ttime.h" + +typedef struct SEventWindowOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SExprSupp scalarSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + SColumn stateCol; // start row index + bool hasKey; + SStateKeys stateKey; + int32_t tsSlotId; // primary timestamp column slot id + STimeWindowAggSupp twAggSup; +} SEventWindowOperatorInfo; + +static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator); +static void destroyEWindowOperatorInfo(void* param); +static void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); +static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator); + +SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWinodwPhysiNode* pStateNode, + SExecTaskInfo* pTaskInfo) { + SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; + SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStartCond)->pExpr; + + if (pStateNode->window.pExprs != NULL) { + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); + int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + pInfo->stateCol = extractColumnFromColumnNode(pColNode); + pInfo->stateKey.type = pInfo->stateCol.type; + pInfo->stateKey.bytes = pInfo->stateCol.bytes; + pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); + if (pInfo->stateKey.pData == NULL) { + goto _error; + } + + int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + initResultSizeInfo(&pOperator->resultInfo, 4096); + + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc); + initBasicInfo(&pInfo->binfo, pResBlock); + initResultRowInfo(&pInfo->binfo.resultRowInfo); + + pInfo->twAggSup = + (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; + + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + + pInfo->tsSlotId = tsSlotId; + + setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(openEventWindowAggOptr, doEventWindowAgg, NULL, destroyEWindowOperatorInfo, + optrDefaultBufFn, NULL); + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + if (pInfo != NULL) { + destroyEWindowOperatorInfo(pInfo); + } + + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} + +void destroyEWindowOperatorInfo(void* param) { + SEventWindowOperatorInfo* pInfo = (SEventWindowOperatorInfo*)param; + if (pInfo == NULL) { + return; + } + + cleanupBasicInfo(&pInfo->binfo); + colDataDestroy(&pInfo->twAggSup.timeWindowData); + + cleanupAggSup(&pInfo->aggSup); + cleanupGroupResInfo(&pInfo->groupResInfo); + taosMemoryFreeClear(param); +} + +static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator) { + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + SEventWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SExprSupp* pSup = &pOperator->exprSupp; + int32_t order = TSDB_ORDER_ASC; + int64_t st = taosGetTimestampUs(); + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + break; + } + + setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); + blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); + + // there is an scalar expression that needs to be calculated right before apply the group aggregation. + if (pInfo->scalarSup.pExprInfo != NULL) { + pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, + pInfo->scalarSup.numOfExprs, NULL); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + } + + doEventWindowAggImpl(pOperator, pInfo, pBlock); + } + + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); + pOperator->status = OP_RES_TO_RETURN; + + return TSDB_CODE_SUCCESS; +} + +void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; + + SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId); + int64_t gid = pBlock->info.id.groupId; + + bool masterScan = true; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + int16_t bytes = pStateColInfoData->info.bytes; + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); + TSKEY* tsList = (TSKEY*)pColInfoData->pData; + + SWindowRowsSup* pRowSup = &pInfo->winSup; + pRowSup->numOfRows = 0; + + struct SColumnDataAgg* pAgg = NULL; + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL; + if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) { + continue; + } + + char* val = colDataGetData(pStateColInfoData, j); + + if (gid != pRowSup->groupId || !pInfo->hasKey) { + // todo extract method + if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { + varDataCopy(pInfo->stateKey.pData, val); + } else { + memcpy(pInfo->stateKey.pData, val, bytes); + } + + pInfo->hasKey = true; + + doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); + doKeepTuple(pRowSup, tsList[j], gid); + } else if (compareVal(val, &pInfo->stateKey)) { + doKeepTuple(pRowSup, tsList[j], gid); + if (j == 0 && pRowSup->startRowIndex != 0) { + pRowSup->startRowIndex = 0; + } + } else { // a new state window started + SResultRow* pResult = NULL; + + // keep the time window for the closed time window. + STimeWindow window = pRowSup->win; + + pRowSup->win.ekey = pRowSup->win.skey; + int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } + + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + + // here we start a new session window + doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); + doKeepTuple(pRowSup, tsList[j], gid); + + // todo extract method + if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { + varDataCopy(pInfo->stateKey.pData, val); + } else { + memcpy(pInfo->stateKey.pData, val, bytes); + } + } + } + + SResultRow* pResult = NULL; + pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; + int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, + pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } + + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + pBlock->info.rows, numOfOutput); +} + +SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SStateWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + setOperatorCompleted(pOperator); + return NULL; + } + + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); + while (1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); + if (!hasRemain) { + setOperatorCompleted(pOperator); + break; + } + + if (pBInfo->pRes->info.rows > 0) { + break; + } + } + + pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; + return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; +} diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8107cea4a0..6c17df9fb1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1964,6 +1964,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; } +// todo make this as an non-blocking operator SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));