From 5dd9322cbf476499b8c6a230ae864c09af6611cd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 28 Dec 2022 09:16:10 +0800 Subject: [PATCH 1/8] enh(query): add event window. --- .../libs/executor/src/eventwindowoperator.c | 295 ++++++++++++++++++ source/libs/executor/src/timewindowoperator.c | 1 + 2 files changed, 296 insertions(+) create mode 100644 source/libs/executor/src/eventwindowoperator.c diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c new file mode 100644 index 0000000000..e38070953a --- /dev/null +++ b/source/libs/executor/src/eventwindowoperator.c @@ -0,0 +1,295 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executorimpl.h" +#include "filter.h" +#include "function.h" +#include "functionMgt.h" +#include "tcommon.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "tfill.h" +#include "ttime.h" + +typedef struct SEventWindowOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SExprSupp scalarSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + SColumn stateCol; // start row index + bool hasKey; + SStateKeys stateKey; + int32_t tsSlotId; // primary timestamp column slot id + STimeWindowAggSupp twAggSup; +} SEventWindowOperatorInfo; + +static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator); +static void destroyEWindowOperatorInfo(void* param); +static void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); +static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator); + +SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWinodwPhysiNode* pStateNode, + SExecTaskInfo* pTaskInfo) { + SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; + SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStartCond)->pExpr; + + if (pStateNode->window.pExprs != NULL) { + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); + int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + pInfo->stateCol = extractColumnFromColumnNode(pColNode); + pInfo->stateKey.type = pInfo->stateCol.type; + pInfo->stateKey.bytes = pInfo->stateCol.bytes; + pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); + if (pInfo->stateKey.pData == NULL) { + goto _error; + } + + int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + initResultSizeInfo(&pOperator->resultInfo, 4096); + + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc); + initBasicInfo(&pInfo->binfo, pResBlock); + initResultRowInfo(&pInfo->binfo.resultRowInfo); + + pInfo->twAggSup = + (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; + + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + + pInfo->tsSlotId = tsSlotId; + + setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(openEventWindowAggOptr, doEventWindowAgg, NULL, destroyEWindowOperatorInfo, + optrDefaultBufFn, NULL); + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + if (pInfo != NULL) { + destroyEWindowOperatorInfo(pInfo); + } + + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} + +void destroyEWindowOperatorInfo(void* param) { + SEventWindowOperatorInfo* pInfo = (SEventWindowOperatorInfo*)param; + if (pInfo == NULL) { + return; + } + + cleanupBasicInfo(&pInfo->binfo); + colDataDestroy(&pInfo->twAggSup.timeWindowData); + + cleanupAggSup(&pInfo->aggSup); + cleanupGroupResInfo(&pInfo->groupResInfo); + taosMemoryFreeClear(param); +} + +static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator) { + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + SEventWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SExprSupp* pSup = &pOperator->exprSupp; + int32_t order = TSDB_ORDER_ASC; + int64_t st = taosGetTimestampUs(); + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + break; + } + + setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); + blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); + + // there is an scalar expression that needs to be calculated right before apply the group aggregation. + if (pInfo->scalarSup.pExprInfo != NULL) { + pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, + pInfo->scalarSup.numOfExprs, NULL); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + } + + doEventWindowAggImpl(pOperator, pInfo, pBlock); + } + + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); + pOperator->status = OP_RES_TO_RETURN; + + return TSDB_CODE_SUCCESS; +} + +void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; + + SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId); + int64_t gid = pBlock->info.id.groupId; + + bool masterScan = true; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + int16_t bytes = pStateColInfoData->info.bytes; + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); + TSKEY* tsList = (TSKEY*)pColInfoData->pData; + + SWindowRowsSup* pRowSup = &pInfo->winSup; + pRowSup->numOfRows = 0; + + struct SColumnDataAgg* pAgg = NULL; + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL; + if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) { + continue; + } + + char* val = colDataGetData(pStateColInfoData, j); + + if (gid != pRowSup->groupId || !pInfo->hasKey) { + // todo extract method + if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { + varDataCopy(pInfo->stateKey.pData, val); + } else { + memcpy(pInfo->stateKey.pData, val, bytes); + } + + pInfo->hasKey = true; + + doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); + doKeepTuple(pRowSup, tsList[j], gid); + } else if (compareVal(val, &pInfo->stateKey)) { + doKeepTuple(pRowSup, tsList[j], gid); + if (j == 0 && pRowSup->startRowIndex != 0) { + pRowSup->startRowIndex = 0; + } + } else { // a new state window started + SResultRow* pResult = NULL; + + // keep the time window for the closed time window. + STimeWindow window = pRowSup->win; + + pRowSup->win.ekey = pRowSup->win.skey; + int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } + + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + + // here we start a new session window + doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); + doKeepTuple(pRowSup, tsList[j], gid); + + // todo extract method + if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { + varDataCopy(pInfo->stateKey.pData, val); + } else { + memcpy(pInfo->stateKey.pData, val, bytes); + } + } + } + + SResultRow* pResult = NULL; + pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; + int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, + pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } + + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + pBlock->info.rows, numOfOutput); +} + +SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SStateWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + setOperatorCompleted(pOperator); + return NULL; + } + + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); + while (1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); + if (!hasRemain) { + setOperatorCompleted(pOperator); + break; + } + + if (pBInfo->pRes->info.rows > 0) { + break; + } + } + + pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; + return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; +} diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8107cea4a0..6c17df9fb1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1964,6 +1964,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; } +// todo make this as an non-blocking operator SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); From e7b1a7dc783b15ea4e6a8cbb8c141f9baf159e15 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Jan 2023 13:15:09 +0800 Subject: [PATCH 2/8] enh(query): support event window. --- .../libs/executor/src/eventwindowoperator.c | 28 ++++++++++++++++++- source/libs/executor/src/timewindowoperator.c | 4 +-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index e38070953a..408d75998d 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -41,6 +41,32 @@ static void destroyEWindowOperatorInfo(void* param); static void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator); +// todo : move to util +static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, + uint64_t groupId) { + pRowSup->startRowIndex = rowIndex; + pRowSup->numOfRows = 0; + pRowSup->win.skey = tsList[rowIndex]; + pRowSup->groupId = groupId; +} + +static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { + pRowSup->win.ekey = ts; + pRowSup->prevTs = ts; + pRowSup->numOfRows += 1; + pRowSup->groupId = groupId; +} + +static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) { + int64_t* ts = (int64_t*)pColData->pData; + int32_t delta = includeEndpoint ? 1 : 0; + + int64_t duration = pWin->ekey - pWin->skey + delta; + ts[2] = duration; // set the duration + ts[3] = pWin->skey; // window start key + ts[4] = pWin->ekey + delta; // window end key +} + SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo) { SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo)); @@ -264,7 +290,7 @@ SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator) { return NULL; } - SStateWindowOperatorInfo* pInfo = pOperator->info; + SEventWindowOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SOptrBasicInfo* pBInfo = &pInfo->binfo; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6c17df9fb1..32e0c517b4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4174,8 +4174,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - pBlock->info.rows, pSup->numOfExprs); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, + currPos - startPos, pBlock->info.rows, pSup->numOfExprs); finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow)); From 53c3cbd661428bd3bceaaf1c10e6b41d1b7cf400 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Jan 2023 17:41:10 +0800 Subject: [PATCH 3/8] enh(query): support event window. --- source/libs/executor/inc/executorimpl.h | 6 + .../libs/executor/src/eventwindowoperator.c | 256 ++++++++---------- source/libs/executor/src/executorimpl.c | 17 +- 3 files changed, 133 insertions(+), 146 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 60efefb1e4..d957dd987b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -800,6 +800,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo); // clang-format on int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, @@ -853,6 +855,10 @@ int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo); +void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, + SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo); +void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) ; + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 408d75998d..61c8030bd9 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -20,7 +20,6 @@ #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" -#include "tfill.h" #include "ttime.h" typedef struct SEventWindowOperatorInfo { @@ -29,16 +28,20 @@ typedef struct SEventWindowOperatorInfo { SExprSupp scalarSup; SGroupResInfo groupResInfo; SWindowRowsSup winSup; - SColumn stateCol; // start row index bool hasKey; SStateKeys stateKey; int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; + + SFilterInfo* pStartCondInfo; + SFilterInfo* pEndCondInfo; + bool inWindow; + SResultRow* pRow; } SEventWindowOperatorInfo; -static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator); -static void destroyEWindowOperatorInfo(void* param); -static void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); +static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator); +static void destroyEWindowOperatorInfo(void* param); +static void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator); // todo : move to util @@ -67,7 +70,7 @@ static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, b ts[4] = pWin->ekey + delta; // window end key } -SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWinodwPhysiNode* pStateNode, +SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo) { SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -75,27 +78,29 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWi goto _error; } - int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; - SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStartCond)->pExpr; + SEventWinodwPhysiNode* pEventWindowNode = (SEventWinodwPhysiNode*)physiNode; - if (pStateNode->window.pExprs != NULL) { + int32_t tsSlotId = ((SColumnNode*)pEventWindowNode->window.pTspk)->slotId; + int32_t code = filterInitFromNode((SNode*)pEventWindowNode->pStartCond, &pInfo->pStartCondInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = filterInitFromNode((SNode*)pEventWindowNode->pEndCond, &pInfo->pEndCondInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (pEventWindowNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); - int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); + SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); if (code != TSDB_CODE_SUCCESS) { goto _error; } } - pInfo->stateCol = extractColumnFromColumnNode(pColNode); - pInfo->stateKey.type = pInfo->stateCol.type; - pInfo->stateKey.bytes = pInfo->stateCol.bytes; - pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); - if (pInfo->stateKey.pData == NULL) { - goto _error; - } - - int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + code = filterInitFromNode((SNode*)pEventWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -103,7 +108,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWi size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &num); initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); @@ -111,20 +116,22 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWi goto _error; } - SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc); + blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); + initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->twAggSup = - (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; + pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pEventWindowNode->window.watermark, + .calTrigger = pEventWindowNode->window.triggerType}; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->tsSlotId = tsSlotId; - setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, + setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(openEventWindowAggOptr, doEventWindowAgg, NULL, destroyEWindowOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -158,17 +165,14 @@ void destroyEWindowOperatorInfo(void* param) { taosMemoryFreeClear(param); } -static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator) { - if (OPTR_IS_OPENED(pOperator)) { - return TSDB_CODE_SUCCESS; - } - +static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) { SEventWindowOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprSupp* pSup = &pOperator->exprSupp; int32_t order = TSDB_ORDER_ASC; - int64_t st = taosGetTimestampUs(); + + blockDataCleanup(pInfo->binfo.pRes); SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -189,133 +193,113 @@ static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator) { } } - doEventWindowAggImpl(pOperator, pInfo, pBlock); + eventWindowAggImpl(pOperator, pInfo, pBlock); } - pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; - initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); - pOperator->status = OP_RES_TO_RETURN; + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; +} +static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult, + SExprSupp* pExprSup, SAggSupporter* pAggSup) { + if (*pResult == NULL) { + SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize); + pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset}; + *pResult = p; + } + + (*pResult)->win = *win; + setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } -void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExprSupp* pSup = &pOperator->exprSupp; - - SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId); - int64_t gid = pBlock->info.id.groupId; - - bool masterScan = true; - int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - int16_t bytes = pStateColInfoData->info.bytes; - - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); - TSKEY* tsList = (TSKEY*)pColInfoData->pData; - +static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, int32_t endIndex, + const SSDataBlock* pBlock, int64_t* tsList, SExecTaskInfo* pTaskInfo) { SWindowRowsSup* pRowSup = &pInfo->winSup; - pRowSup->numOfRows = 0; - struct SColumnDataAgg* pAgg = NULL; - for (int32_t j = 0; j < pBlock->info.rows; ++j) { - pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL; - if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) { - continue; - } + int32_t numOfOutput = pSup->numOfExprs; + int32_t numOfRows = endIndex - startIndex + 1; - char* val = colDataGetData(pStateColInfoData, j); + doKeepTuple(pRowSup, tsList[endIndex], pBlock->info.id.groupId); - if (gid != pRowSup->groupId || !pInfo->hasKey) { - // todo extract method - if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { - varDataCopy(pInfo->stateKey.pData, val); - } else { - memcpy(pInfo->stateKey.pData, val, bytes); - } - - pInfo->hasKey = true; - - doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); - doKeepTuple(pRowSup, tsList[j], gid); - } else if (compareVal(val, &pInfo->stateKey)) { - doKeepTuple(pRowSup, tsList[j], gid); - if (j == 0 && pRowSup->startRowIndex != 0) { - pRowSup->startRowIndex = 0; - } - } else { // a new state window started - SResultRow* pResult = NULL; - - // keep the time window for the closed time window. - STimeWindow window = pRowSup->win; - - pRowSup->win.ekey = pRowSup->win.skey; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); - } - - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); - - // here we start a new session window - doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); - doKeepTuple(pRowSup, tsList[j], gid); - - // todo extract method - if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { - varDataCopy(pInfo->stateKey.pData, val); - } else { - memcpy(pInfo->stateKey.pData, val, bytes); - } - } - } - - SResultRow* pResult = NULL; - pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, - pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + int32_t ret = + setSingleOutputTupleBufv1(&pInfo->binfo.resultRowInfo, &pRowSup->win, &pInfo->pRow, pSup, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows, pBlock->info.rows, numOfOutput); } -SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } +void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; - SEventWindowOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SOptrBasicInfo* pBInfo = &pInfo->binfo; + int64_t gid = pBlock->info.id.groupId; - pTaskInfo->code = pOperator->fpSet._openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - setOperatorCompleted(pOperator); - return NULL; - } + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); + TSKEY* tsList = (TSKEY*)pColInfoData->pData; - blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - while (1) { - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + SColumnInfoData *ps = NULL, *pe = NULL; - bool hasRemain = hasRemainResults(&pInfo->groupResInfo); - if (!hasRemain) { - setOperatorCompleted(pOperator); - break; - } + SWindowRowsSup* pRowSup = &pInfo->winSup; + pRowSup->numOfRows = 0; - if (pBInfo->pRes->info.rows > 0) { - break; + SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; + int32_t code = filterSetDataFromSlotId(pInfo->pStartCondInfo, ¶m1); + + int32_t status1 = 0; + bool keep1 = filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1); + + SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; + code = filterSetDataFromSlotId(pInfo->pEndCondInfo, ¶m2); + + int32_t status2 = 0; + bool keep2 = filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2); + + int32_t rowIndex = 0; + int32_t startIndex = pInfo->inWindow ? 0 : -1; + + while (rowIndex < pBlock->info.rows) { + if (pInfo->inWindow) { // let's find the first end value + for (rowIndex = startIndex; rowIndex < pBlock->info.rows; ++rowIndex) { + if (((bool*)pe->pData)[rowIndex]) { + break; + } + } + + if (rowIndex < pBlock->info.rows) { + doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo); + + // todo check if pblock buffer is enough + doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset); + + copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pInfo->binfo.pRes, + pSup->rowEntryInfoOffset, pTaskInfo); + + pInfo->binfo.pRes->info.rows += pInfo->pRow->numOfRows; + + pInfo->inWindow = false; + rowIndex += 1; + } else { + doEventWindowAggImpl(pInfo, pSup, startIndex, pBlock->info.rows - 1, pBlock, tsList, pTaskInfo); + } + } else { // find the first start value that is fulfill for the start condition + for (; rowIndex < pBlock->info.rows; ++rowIndex) { + if (((bool*)ps->pData)[rowIndex]) { + doKeepNewWindowStartInfo(pRowSup, tsList, rowIndex, gid); + pInfo->inWindow = true; + startIndex = rowIndex; + break; + } + } + + if (pInfo->inWindow) { + continue; + } else { + break; + } } } - - pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; - return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fecdcd8fa3..5ee209a07a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -936,8 +936,7 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u pAggInfo->groupId = groupId; } -static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, - const int32_t* rowEntryOffset) { +void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) { bool returnNotNull = false; for (int32_t j = 0; j < numOfExprs; ++j) { SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset); @@ -960,8 +959,8 @@ static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t nu } } -static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, - SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) { +void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, + SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) { for (int32_t j = 0; j < numOfExprs; ++j) { int32_t slotId = pExprInfo[j].base.resSchema.slotId; @@ -1022,7 +1021,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos T_LONG_JMP(pTaskInfo->env, code); } - doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); + copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; @@ -1070,7 +1069,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS } pGroupResInfo->index += 1; - doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); + copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; @@ -2087,8 +2086,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); - } else { - ASSERT(0); } if (pOperator != NULL) { @@ -2179,8 +2176,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo); - } else { - ASSERT(0); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) { + pOptr = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo); } taosMemoryFree(ops); From 169a6703d171ef6046e34135edeef0d6fce88ddb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Jan 2023 17:51:50 +0800 Subject: [PATCH 4/8] enh(query): support event window. --- .../libs/executor/src/eventwindowoperator.c | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 61c8030bd9..1480ac0451 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -172,7 +172,9 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; int32_t order = TSDB_ORDER_ASC; - blockDataCleanup(pInfo->binfo.pRes); + SSDataBlock* pRes = pInfo->binfo.pRes; + + blockDataCleanup(pRes); SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -194,9 +196,12 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) { } eventWindowAggImpl(pOperator, pInfo, pBlock); + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + return pRes; + } } - return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; + return pRes->info.rows == 0 ? NULL : pRes; } static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult, @@ -236,6 +241,7 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pRes = pInfo->binfo.pRes; int64_t gid = pBlock->info.id.groupId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); @@ -272,13 +278,18 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf if (rowIndex < pBlock->info.rows) { doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo); - // todo check if pblock buffer is enough doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset); - copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pInfo->binfo.pRes, + // check buffer size + if (pRes->info.rows + pInfo->pRow->numOfRows >= pRes->info.capacity) { + int32_t newSize = pRes->info.rows + pInfo->pRow->numOfRows; + blockDataEnsureCapacity(pRes, newSize); + } + + copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes, pSup->rowEntryInfoOffset, pTaskInfo); - pInfo->binfo.pRes->info.rows += pInfo->pRow->numOfRows; + pRes->info.rows += pInfo->pRow->numOfRows; pInfo->inWindow = false; rowIndex += 1; From 4dce537a8a77fc682df83a0520179a8604d2ef34 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Jan 2023 19:27:16 +0800 Subject: [PATCH 5/8] enh(query): opt filter perf. --- source/libs/executor/src/executorimpl.c | 102 +++++++++++++++++++----- 1 file changed, 84 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5ee209a07a..41f93a62c5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -858,32 +858,100 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD } else if (status == FILTER_RESULT_NONE_QUALIFIED) { pBlock->info.rows = 0; } else { - SSDataBlock* px = createOneDataBlock(pBlock, true); + int32_t bmLen = BitmapLen(totalRows); + char* pBitmap = NULL; size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); // it is a reserved column for scalar function, and no data in this column yet. - if (pDst->pData == NULL || pSrc->pData == NULL) { + if (pDst->pData == NULL) { continue; } - colInfoDataCleanup(pDst, pBlock->info.rows); - int32_t numOfRows = 0; - for (int32_t j = 0; j < totalRows; ++j) { - if (((int8_t*)p->pData)[j] == 0) { - continue; - } - if (colDataIsNull_s(pSrc, j)) { - colDataAppendNULL(pDst, numOfRows); - } else { - colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false); - } - numOfRows += 1; - } + switch (pDst->info.type) { + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_NCHAR: + break; + default: + if (pBitmap == NULL) { + pBitmap = taosMemoryCalloc(1, bmLen); + } + memcpy(pBitmap, pDst->nullbitmap, bmLen); + memset(pDst->nullbitmap, 0, bmLen); + + int32_t j = 0; + + switch (pDst->info.type) { + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + case TSDB_DATA_TYPE_DOUBLE: + case TSDB_DATA_TYPE_TIMESTAMP: + while (j < totalRows) { + if (((int8_t*)p->pData)[j] == 0) { + j += 1; + continue; + } + + if (colDataIsNull_f(pBitmap, j)) { + colDataAppendNULL(pDst, numOfRows); + } else { + ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j]; + } + numOfRows += 1; + } + break; + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + while (j < totalRows) { + if (((int8_t*)p->pData)[j] == 0) { + j += 1; + continue; + } + if (colDataIsNull_f(pBitmap, j)) { + colDataAppendNULL(pDst, numOfRows); + } else { + ((int32_t*)pDst->pData)[numOfRows++] = ((int32_t*)pDst->pData)[j]; + } + numOfRows += 1; + } + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + while (j < totalRows) { + if (((int8_t*)p->pData)[j] == 0) { + j += 1; + continue; + } + if (colDataIsNull_f(pBitmap, j)) { + colDataAppendNULL(pDst, numOfRows); + } else { + ((int16_t*)pDst->pData)[numOfRows++] = ((int16_t*)pDst->pData)[j]; + } + numOfRows += 1; + } + break; + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + while (j < totalRows) { + if (((int8_t*)p->pData)[j] == 0) { + j += 1; + continue; + } + if (colDataIsNull_f(pBitmap, j)) { + colDataAppendNULL(pDst, numOfRows); + } else { + ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j]; + } + numOfRows += 1; + } + break; + } + }; // todo this value can be assigned directly if (pBlock->info.rows == totalRows) { @@ -892,8 +960,6 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD ASSERT(pBlock->info.rows == numOfRows); } } - - blockDataDestroy(px); // fix memory leak } } From b8183580cb954668a21a72a1b53d29bfb1cf5c45 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 11 Jan 2023 09:22:31 +0800 Subject: [PATCH 6/8] feat: add an event_window clause for the explain command --- source/libs/command/inc/commandInt.h | 3 + source/libs/command/src/explain.c | 154 ++++++--------------------- tests/script/tsim/query/event.sim | 69 ++++++++++++ 3 files changed, 103 insertions(+), 123 deletions(-) create mode 100644 tests/script/tsim/query/event.sim diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 6acf19218d..764553f1e9 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -64,6 +64,9 @@ extern "C" { #define EXPLAIN_IGNORE_GROUPID_FORMAT "Ignore Group Id: %s" #define EXPLAIN_PARTITION_KETS_FORMAT "Partition Key: " #define EXPLAIN_INTERP_FORMAT "Interp" +#define EXPLAIN_EVENT_FORMAT "Event" +#define EXPLAIN_EVENT_START_FORMAT "Start Cond: " +#define EXPLAIN_EVENT_END_FORMAT "End Cond: " #define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms" #define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms" diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 253718048d..4624e471bf 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -114,129 +114,7 @@ _return: int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNodeList **pChildren) { int32_t tlen = 0; - SNodeList *pPhysiChildren = NULL; - - switch (pNode->type) { - case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: { - STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode; - pPhysiChildren = pTagScanNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: - case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { - STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode; - pPhysiChildren = pTblScanNode->scan.node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: { - SSystemTableScanPhysiNode *pSTblScanNode = (SSystemTableScanPhysiNode *)pNode; - pPhysiChildren = pSTblScanNode->scan.node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_PROJECT: { - SProjectPhysiNode *pPrjNode = (SProjectPhysiNode *)pNode; - pPhysiChildren = pPrjNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode; - pPhysiChildren = pJoinNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { - SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; - pPhysiChildren = pAggNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: { - SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; - pPhysiChildren = pExchNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_SORT: { - SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode; - pPhysiChildren = pSortNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: { - SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode; - pPhysiChildren = pIntNode->window.node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: { - SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode; - pPhysiChildren = pSessNode->window.node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: { - SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode; - pPhysiChildren = pStateNode->window.node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_PARTITION: { - SPartitionPhysiNode *partitionPhysiNode = (SPartitionPhysiNode *)pNode; - pPhysiChildren = partitionPhysiNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_MERGE: { - SMergePhysiNode *mergePhysiNode = (SMergePhysiNode *)pNode; - pPhysiChildren = mergePhysiNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: { - SIndefRowsFuncPhysiNode *indefPhysiNode = (SIndefRowsFuncPhysiNode *)pNode; - pPhysiChildren = indefPhysiNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: { - SMergeAlignedIntervalPhysiNode *intPhysiNode = (SMergeAlignedIntervalPhysiNode *)pNode; - pPhysiChildren = intPhysiNode->window.node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_FILL: { - SFillPhysiNode *fillPhysiNode = (SFillPhysiNode *)pNode; - pPhysiChildren = fillPhysiNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: { - STableMergeScanPhysiNode *mergePhysiNode = (STableMergeScanPhysiNode *)pNode; - pPhysiChildren = mergePhysiNode->scan.node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: { - SBlockDistScanPhysiNode *distPhysiNode = (SBlockDistScanPhysiNode *)pNode; - pPhysiChildren = distPhysiNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: { - SLastRowScanPhysiNode *lastRowPhysiNode = (SLastRowScanPhysiNode *)pNode; - pPhysiChildren = lastRowPhysiNode->scan.node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: { - STableCountScanPhysiNode *tableCountPhysiNode = (STableCountScanPhysiNode *)pNode; - pPhysiChildren = tableCountPhysiNode->scan.node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: { - SGroupSortPhysiNode *groupSortPhysiNode = (SGroupSortPhysiNode *)pNode; - pPhysiChildren = groupSortPhysiNode->node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: { - SMergeIntervalPhysiNode *mergeIntPhysiNode = (SMergeIntervalPhysiNode *)pNode; - pPhysiChildren = mergeIntPhysiNode->window.node.pChildren; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: { - SInterpFuncPhysiNode *interpPhysiNode = (SInterpFuncPhysiNode *)pNode; - pPhysiChildren = interpPhysiNode->node.pChildren; - break; - } - default: - qError("not supported physical node type %d", pNode->type); - QRY_ERR_RET(TSDB_CODE_APP_ERROR); - } + SNodeList *pPhysiChildren = pNode->pChildren; if (pPhysiChildren) { *pChildren = nodesMakeList(); @@ -1583,6 +1461,36 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } break; } + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: { + SEventWinodwPhysiNode *pEventNode = (SEventWinodwPhysiNode *)pNode; + EXPLAIN_ROW_NEW(level, EXPLAIN_EVENT_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + } + EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pEventNode->window.pFuncs->length); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pEventNode->window.node.pOutputDataBlockDesc->totalRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + + if (verbose) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_EVENT_START_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pEventNode->pStartCond, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_EVENT_END_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pEventNode->pEndCond, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + break; + } default: qError("not supported physical node type %d", pNode->type); return TSDB_CODE_APP_ERROR; diff --git a/tests/script/tsim/query/event.sim b/tests/script/tsim/query/event.sim new file mode 100644 index 0000000000..c72b7ba125 --- /dev/null +++ b/tests/script/tsim/query/event.sim @@ -0,0 +1,69 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print ======== prepare data + +sql drop database if exists db1; +sql create database db1 vgroups 5; +sql use db1; +sql create stable sta (ts timestamp, f1 int, f2 binary(10), f3 bool) tags(t1 int, t2 bool, t3 binary(10)); +sql create table tba1 using sta tags(0, false, '0'); +sql create table tba2 using sta tags(1, true, '1'); +sql create table tba3 using sta tags(null, null, ''); +sql create table tba4 using sta tags(1, false, null); +sql create table tba5 using sta tags(3, true, 'aa'); +sql insert into tba1 values ('2022-09-26 15:15:01', 0, "a", false); +sql insert into tba1 values ('2022-09-26 15:15:02', 1, "0", true); +sql insert into tba1 values ('2022-09-26 15:15:03', 5, "5", false); +sql insert into tba1 values ('2022-09-26 15:15:04', 3, 'b', false); +sql insert into tba1 values ('2022-09-26 15:15:05', 0, '1', false); +sql insert into tba1 values ('2022-09-26 15:15:06', 2, 'd', true); + +sql insert into tba2 values ('2022-09-27 15:15:01', 0, "a", false); +sql insert into tba2 values ('2022-09-27 15:15:02', 1, "0", true); +sql insert into tba2 values ('2022-09-27 15:15:03', 5, "5", false); +sql insert into tba2 values ('2022-09-27 15:15:04', null, null, null); + +# child table: no window +print ====> select count(*) from tba1 event_window start with f1 = 0 end with f2 = 'c'; +sql select count(*) from tba1 event_window start with f1 = 0 end with f2 = 'c'; +if $rows != 0 then + return -1 +endi + +# child table: single row window +print ====> select count(*) from tba1 event_window start with f1 = 0 end with f3 = false; +sql select count(*) from tba1 event_window start with f1 = 0 end with f3 = false +if $rows != 1 then + return -1 +endi +if $data00 != 1 then + return -1 +endi + +# child table: multi rows window +print ====> select count(*) from tba1 event_window start with f1 = 0 end with f2 = 'b'; +sql select count(*) from tba1 event_window start with f1 = 0 end with f2 = 'b'; +if $rows != 1 then + return -1 +endi +if $data00 != 4 then + return -1 +endi + +# child table: multi windows +print ====> select count(*) from tba1 event_window start with f1 >= 0 end with f3 = true; +sql select count(*) from tba1 event_window start with f1 >= 0 end with f3 = true; +if $rows != 2 then + return -1 +endi +if $data00 != 2 then + return -1 +endi +if $data00 != 4 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From 67a291955e96a31e09ea6970b5acc658c6006f98 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 11 Jan 2023 17:36:01 +0800 Subject: [PATCH 7/8] fix(query): initialize output buffer for each output. --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/eventwindowoperator.c | 2 ++ source/libs/executor/src/executorimpl.c | 14 ++++++++++++++ 3 files changed, 17 insertions(+) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d957dd987b..dcb4a8a0ed 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -729,6 +729,7 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); +void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 1480ac0451..215636a567 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -213,6 +213,8 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi } (*pResult)->win = *win; + + clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 41f93a62c5..79ecd8189e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -813,6 +813,20 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO } } +void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) { + for (int32_t i = 0; i < numOfOutput; ++i) { + SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; + if (pResInfo == NULL) { + continue; + } + + pResInfo->initialized = false; + pResInfo->numOfRes = 0; + pResInfo->isNullRes = 0; + pResInfo->complete = false; + } +} + void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) { if (pFilterInfo == NULL || pBlock->info.rows == 0) { return; From 7efdc9e27a32e212bef13efa201c5d170c044f42 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 11 Jan 2023 18:00:36 +0800 Subject: [PATCH 8/8] fix(query): fix memory leak. --- .../libs/executor/src/eventwindowoperator.c | 19 +++++++++++++++++++ tests/script/tsim/query/event.sim | 4 ++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 215636a567..0dbc05ceef 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -157,6 +157,20 @@ void destroyEWindowOperatorInfo(void* param) { return; } + if (pInfo->pRow != NULL) { + taosMemoryFree(pInfo->pRow); + } + + if (pInfo->pStartCondInfo != NULL) { + filterFreeInfo(pInfo->pStartCondInfo); + pInfo->pStartCondInfo = NULL; + } + + if (pInfo->pEndCondInfo != NULL) { + filterFreeInfo(pInfo->pEndCondInfo); + pInfo->pEndCondInfo = NULL; + } + cleanupBasicInfo(&pInfo->binfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); @@ -315,4 +329,9 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf } } } + + colDataDestroy(ps); + taosMemoryFree(ps); + colDataDestroy(pe); + taosMemoryFree(pe); } diff --git a/tests/script/tsim/query/event.sim b/tests/script/tsim/query/event.sim index c72b7ba125..adc94a34de 100644 --- a/tests/script/tsim/query/event.sim +++ b/tests/script/tsim/query/event.sim @@ -36,7 +36,7 @@ endi # child table: single row window print ====> select count(*) from tba1 event_window start with f1 = 0 end with f3 = false; sql select count(*) from tba1 event_window start with f1 = 0 end with f3 = false -if $rows != 1 then +if $rows != 2 then return -1 endi if $data00 != 1 then @@ -62,7 +62,7 @@ endi if $data00 != 2 then return -1 endi -if $data00 != 4 then +if $data10 != 4 then return -1 endi