From 9864e367bba078a811becec35fcf504c42eaee23 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Apr 2022 18:47:59 +0800 Subject: [PATCH] refactor(query): do some internal refactor in executor. --- source/libs/executor/inc/executorimpl.h | 8 +-- source/libs/executor/src/executorimpl.c | 66 +++++++++++++++--------- source/libs/executor/src/groupoperator.c | 9 ++-- 3 files changed, 52 insertions(+), 31 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5ef1754913..fa86cd2a18 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -342,7 +342,7 @@ typedef struct STableScanInfo { int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; - double sampleRatio; // data block sample ratio + double sampleRatio; // data block sample ratio, 1 by default SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. } STableScanInfo; @@ -395,7 +395,6 @@ typedef struct SOptrBasicInfo { int32_t* rowCellInfoOffset; // offset value for each row result cell info SqlFunctionCtx* pCtx; SSDataBlock* pRes; - int32_t capacity; // TODO remove it } SOptrBasicInfo; // TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset @@ -422,6 +421,8 @@ typedef struct STableIntervalOperatorInfo { EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. SColumnInfoData timeWindowData; // query time window info for scalar function execution. + double watermark; // water mark + int32_t trigger; // trigger model } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -602,7 +603,8 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, - int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); + SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); +void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5261c182d3..beeea4f223 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4449,7 +4449,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { SSortHandle* pHandle = pInfo->pSortHandle; SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false); - blockDataEnsureCapacity(pDataBlock, pInfo->binfo.capacity); + blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity); while (1) { blockDataCleanup(pDataBlock); @@ -4461,7 +4461,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { // build datablock for merge for one group appendOneRowToDataBlock(pDataBlock, pTupleHandle); - if (pDataBlock->info.rows >= pInfo->binfo.capacity) { + if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) { break; } } @@ -4496,7 +4496,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortedMergeOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity); + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -4603,7 +4603,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pInfo->bufPageSize = 1024; pInfo->pSortInfo = pSortInfo; - pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize); + pOperator->resultInfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize); pOperator->name = "SortedMerge"; // pOperator->operatorType = OP_SortedMerge; @@ -5103,8 +5103,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro return NULL; } - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { @@ -5124,7 +5124,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup } if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -5158,8 +5158,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); ASSERT(pInfo->binfo.pRes->info.rows > 0); pOperator->status = OP_RES_TO_RETURN; @@ -5260,8 +5260,8 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); OPTR_SET_OPENED(pOperator); - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { @@ -5352,7 +5352,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5383,8 +5383,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); - blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5401,7 +5401,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5432,8 +5432,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); - blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5611,15 +5611,23 @@ static void cleanupAggSup(SAggSupporter* pAggSup) { } int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, - int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { + SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pRes = pResultBlock; - pBasicInfo->capacity = numOfRows; doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey); return TSDB_CODE_SUCCESS; } +void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { + pOperator->resultInfo.capacity = numOfRows; + pOperator->resultInfo.threshold = numOfRows * 0.75; + + if (pOperator->resultInfo.threshold == 0) { + pOperator->resultInfo.capacity = numOfRows; + } +} + static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) { STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); if (pTableQueryInfo == NULL) { @@ -5655,7 +5663,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfRows = 1; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { goto _error; @@ -5805,7 +5815,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p int32_t numOfCols = num; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, numOfRows); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); @@ -5854,7 +5866,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); @@ -5933,7 +5947,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pInfo->colIndex = -1; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, 4096); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); pOperator->name = "StateWindowOperator"; @@ -5968,7 +5984,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8739371dd9..8018a8dd31 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -307,11 +307,11 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou // pInfo->binfo.rowCellInfoOffset); // } - blockDataEnsureCapacity(pRes, pInfo->binfo.capacity); + blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); while(1) { - toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); doFilter(pInfo->pCondition, pRes); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); @@ -348,7 +348,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx goto _error; } - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str); + initResultSizeInfo(pOperator, 4096); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); pOperator->name = "GroupbyAggOperator";