From 6a6c6491135f45ff0641397cc936fab788e6d1f1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Feb 2023 13:24:59 +0800 Subject: [PATCH 1/2] fix(query): identify the data blocks from different group. --- .../libs/executor/src/eventwindowoperator.c | 56 ++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 0dbc05ceef..d41875095f 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -26,23 +26,19 @@ typedef struct SEventWindowOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; SExprSupp scalarSup; - SGroupResInfo groupResInfo; SWindowRowsSup winSup; - bool hasKey; - SStateKeys stateKey; int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; - - SFilterInfo* pStartCondInfo; - SFilterInfo* pEndCondInfo; - bool inWindow; - SResultRow* pRow; + uint64_t groupId; // current group id, used to identify the data block from different groups + SFilterInfo* pStartCondInfo; + SFilterInfo* pEndCondInfo; + bool inWindow; + SResultRow* pRow; } SEventWindowOperatorInfo; static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator); static void destroyEWindowOperatorInfo(void* param); -static void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); -static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator); +static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); // todo : move to util static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, @@ -175,7 +171,6 @@ void destroyEWindowOperatorInfo(void* param) { colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupAggSup(&pInfo->aggSup); - cleanupGroupResInfo(&pInfo->groupResInfo); taosMemoryFreeClear(param); } @@ -253,36 +248,45 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu pBlock->info.rows, numOfOutput); } -void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExprSupp* pSup = &pOperator->exprSupp; - - SSDataBlock* pRes = pInfo->binfo.pRes; - int64_t gid = pBlock->info.id.groupId; - +int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pRes = pInfo->binfo.pRes; + int64_t gid = pBlock->info.id.groupId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); TSKEY* tsList = (TSKEY*)pColInfoData->pData; - + SWindowRowsSup* pRowSup = &pInfo->winSup; SColumnInfoData *ps = NULL, *pe = NULL; + int32_t rowIndex = 0; - SWindowRowsSup* pRowSup = &pInfo->winSup; pRowSup->numOfRows = 0; + if (pInfo->groupId == 0) { + pInfo->groupId = gid; + } else if (pInfo->groupId != gid) { + // this is a new group, reset the info + pInfo->inWindow = false; + } SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; - int32_t code = filterSetDataFromSlotId(pInfo->pStartCondInfo, ¶m1); + + int32_t code = filterSetDataFromSlotId(pInfo->pStartCondInfo, ¶m1); + if (code != TSDB_CODE_SUCCESS) { + return code; + } int32_t status1 = 0; - bool keep1 = filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1); + filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1); SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; code = filterSetDataFromSlotId(pInfo->pEndCondInfo, ¶m2); + if (code != TSDB_CODE_SUCCESS) { + return code; + } int32_t status2 = 0; - bool keep2 = filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2); + filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2); - int32_t rowIndex = 0; int32_t startIndex = pInfo->inWindow ? 0 : -1; - while (rowIndex < pBlock->info.rows) { if (pInfo->inWindow) { // let's find the first end value for (rowIndex = startIndex; rowIndex < pBlock->info.rows; ++rowIndex) { @@ -334,4 +338,6 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf taosMemoryFree(ps); colDataDestroy(pe); taosMemoryFree(pe); + + return TSDB_CODE_SUCCESS; } From f26fa32340dbdef0c9e692e78d24b03ce6000672 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Feb 2023 13:32:01 +0800 Subject: [PATCH 2/2] other: add some comments. --- source/libs/executor/src/eventwindowoperator.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 223b45dc8f..49e2d5bc4a 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -298,7 +298,6 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p if (rowIndex < pBlock->info.rows) { doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo); - doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset); // check buffer size @@ -328,9 +327,9 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p } if (pInfo->inWindow) { - continue; + continue; // try to find the end position } else { - break; + break; // no valid start position, quit } } }